You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/05 03:53:57 UTC

[incubator-seatunnel] branch dev updated: [Connector-V2] Add Kudu source and sink connector (#2254)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0483cbc2d [Connector-V2] Add Kudu source and sink connector (#2254)
0483cbc2d is described below

commit 0483cbc2df2458db24beec61d71b6d62520b4dea
Author: liuyehan <81...@users.noreply.github.com>
AuthorDate: Fri Aug 5 11:53:52 2022 +0800

    [Connector-V2] Add Kudu source and sink connector (#2254)
    
    * 0
    
    * Update pom.xml
    
    add kudu dependency
    
    * Update plugin-mapping.properties
    
    add kudu config
    
    * Update pom.xml
    
    add kudu
    
    * add email sink connector
    
    * Delete seatunnel-connectors-v2/connector-email directory
    
    * Update plugin-mapping.properties
    
    * Update plugin-mapping.properties
    
    * Update pom.xml
    
    * Create pom.xml
    
    * [Connector-V2] Add Kudu source and sink connector
    
    * [Connector-V2] Add Kudu source and sink connector
    
    * [Connector-V2] update
    
    * [Connector-V2] solve ci error
    
    * [Connector-V2] fix problem on code review
    
    * [Connector-V2] add kudu usage document and fix problem on code review
    
    * Update ExceptionUtil.java
    
    * Update pom.xml
    
    * Update Kudu.md
    
    * Update ExceptionUtil.java
    
    Co-authored-by: Hisoka <fa...@qq.com>
---
 docs/en/connector-v2/sink/Kudu.md                  |  39 ++++
 docs/en/connector-v2/source/Kudu.md                |  41 +++++
 plugin-mapping.properties                          |   2 +
 pom.xml                                            |   7 +
 .../org/apache/seatunnel/common/ExceptionUtil.java |  37 ++++
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 .../{ => connector-kudu}/pom.xml                   |  60 +++---
 .../seatunnel/kudu/config/KuduSinkConfig.java      |  64 +++++++
 .../seatunnel/kudu/config/KuduSourceConfig.java    |  29 +++
 .../seatunnel/kudu/kuduclient/KuduInputFormat.java | 202 +++++++++++++++++++++
 .../kudu/kuduclient/KuduOutputFormat.java          | 164 +++++++++++++++++
 .../seatunnel/kudu/kuduclient/KuduTypeMapper.java  | 103 +++++++++++
 .../connectors/seatunnel/kudu/sink/KuduSink.java   |  69 +++++++
 .../seatunnel/kudu/sink/KuduSinkWriter.java        |  64 +++++++
 .../seatunnel/kudu/source/KuduSource.java          | 191 +++++++++++++++++++
 .../seatunnel/kudu/source/KuduSourceReader.java    | 107 +++++++++++
 .../seatunnel/kudu/source/KuduSourceSplit.java     |  39 ++++
 .../kudu/source/KuduSourceSplitEnumerator.java     | 131 +++++++++++++
 .../seatunnel/kudu/source/PartitionParameter.java  |  32 ++++
 .../seatunnel/kudu/state/KuduSourceState.java      |  23 +++
 .../src/main/resources/kudu_to_kudu_flink.conf     |  60 ++++++
 .../src/main/resources/kudu_to_kudu_spark.conf     |  64 +++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 23 files changed, 1500 insertions(+), 34 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kudu.md b/docs/en/connector-v2/sink/Kudu.md
new file mode 100644
index 000000000..4ccb993dc
--- /dev/null
+++ b/docs/en/connector-v2/sink/Kudu.md
@@ -0,0 +1,39 @@
+# Kudu
+
+## Description
+
+Write data to Kudu.
+
+ The tested kudu version is 1.11.1.
+
+## Options
+
+| name                     | type    | required | default value |
+|--------------------------|---------|----------|---------------|
+| kudu_master             | string  | yes      | -             |
+| kudu_table               | string  | yes      | -             |
+| save_mode               | string  | yes      | -             |
+
+### kudu_master [string]
+
+`kudu_master`  The address of kudu master,such as '192.168.88.110:7051'.
+
+### kudu_table [string]
+
+`kudu_table` The name of kudu table..
+
+### save_mode [string]
+
+Storage mode, we need support `overwrite` and `append`. `append` is now supported.
+
+## Example
+
+```bash
+
+ kuduSink {
+      kudu_master = "192.168.88.110:7051"
+      kudu_table = "studentlyhresultflink"
+      save_mode="append"
+   }
+
+```
diff --git a/docs/en/connector-v2/source/Kudu.md b/docs/en/connector-v2/source/Kudu.md
new file mode 100644
index 000000000..7b85bc7cd
--- /dev/null
+++ b/docs/en/connector-v2/source/Kudu.md
@@ -0,0 +1,41 @@
+# Kudu
+
+## Description
+
+Used to read data from Kudu. Currently, only supports Query with Batch Mode.
+
+ The tested kudu version is 1.11.1.
+
+## Options
+
+| name                     | type    | required | default value |
+|--------------------------|---------|----------|---------------|
+| kudu_master             | string  | yes      | -             |
+| kudu_table               | string  | yes      | -             |
+| columnsList               | string  | yes      | -             |
+
+### kudu_master [string]
+
+`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.
+
+### kudu_table [string]
+
+`kudu_table` The name of kudu table..
+
+### columnsList [string]
+
+`columnsList` Specifies the column names of the table.
+
+## Examples
+
+```hocon
+source {
+   KuduSource {
+      result_table_name = "studentlyh2"
+      kudu_master = "192.168.88.110:7051"
+      kudu_table = "studentlyh2"
+      columnsList = "id,name,age,sex"
+    }
+
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c448b86bc..c6c3f16f5 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -103,6 +103,8 @@ seatunnel.sink.Clickhouse = connector-clickhouse
 seatunnel.sink.ClickhouseFile = connector-clickhouse
 seatunnel.source.Jdbc = connector-jdbc
 seatunnel.sink.Jdbc = connector-jdbc
+seatunnel.source.Kudu = connector-kudu
+seatunnel.sink.Kudu = connector-kudu
 seatunnel.sink.Email = connector-email
 seatunnel.sink.HdfsFile = connector-file-hadoop
 seatunnel.sink.LocalFile = connector-file-local
diff --git a/pom.xml b/pom.xml
index 39927e48b..188a1ba57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
         <mongo-spark.version>2.2.0</mongo-spark.version>
         <spark-redis.version>2.6.0</spark-redis.version>
         <commons-lang3.version>3.4</commons-lang3.version>
+        <kudu.version>1.11.1</kudu.version>
         <email.version>1.5.6</email.version>
         <commons-collections4.version>4.4</commons-collections4.version>
         <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
@@ -263,6 +264,12 @@
                 <groupId>net.jpountz.lz4</groupId>
                 <artifactId>lz4</artifactId>
                 <version>1.3.0</version>
+            </dependency>
+             <!--kudu -->
+            <dependency>
+                <groupId>org.apache.kudu</groupId>
+                <artifactId>kudu-client</artifactId>
+                <version>${kudu.version}</version>
             </dependency>
             <!--email -->
             <dependency>
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/ExceptionUtil.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/ExceptionUtil.java
new file mode 100644
index 000000000..7ad7e7fc8
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/ExceptionUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class ExceptionUtil {
+    public static String getMessage(Throwable e) {
+        try (StringWriter sw = new StringWriter();
+             PrintWriter pw = new PrintWriter(sw)){
+            // Output the error stack information to the printWriter
+            e.printStackTrace(pw);
+            pw.flush();
+            sw.flush();
+            return sw.toString();
+        } catch (Exception e1) {
+            e1.printStackTrace();
+            throw new RuntimeException("Failed to print exception logs", e1);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index dd8d105bb..336c92b74 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -101,6 +101,11 @@
             <artifactId>connector-dingtalk</artifactId>
             <version>${project.version}</version>
         </dependency>
+         <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-kudu</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-email</artifactId>
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/connector-kudu/pom.xml
similarity index 53%
copy from seatunnel-connectors-v2/pom.xml
copy to seatunnel-connectors-v2/connector-kudu/pom.xml
index b5ed52de7..6a3e5238d 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/connector-kudu/pom.xml
@@ -17,50 +17,42 @@
     limitations under the License.
 
 -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel</artifactId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <artifactId>seatunnel-connectors-v2</artifactId>
 
-    <modules>
-        <module>connector-common</module>
-        <module>connector-clickhouse</module>
-        <module>connector-console</module>
-        <module>connector-fake</module>
-        <module>connector-http</module>
-        <module>connector-jdbc</module>
-        <module>connector-kafka</module>
-        <module>connector-pulsar</module>
-        <module>connector-socket</module>
-        <module>connector-hive</module>
-        <module>connector-file</module>
-        <module>connector-hudi</module>
-        <module>connector-assert</module>
-        <module>connector-email</module>
-        <module>connector-dingtalk</module>
-    </modules>
+    <artifactId>connector-kudu</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <skip>true</skip>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
 </project>
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
new file mode 100644
index 000000000..b44a3e730
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
+
+@Data
+public class KuduSinkConfig {
+
+    private static final String KUDU_SAVE_MODE = "save_mode";
+    private static final String KUDU_MASTER = "kudu_master";
+    private static final String KUDU_TABLE_NAME = "kudu_table";
+
+    private SaveMode saveMode = SaveMode.APPEND;
+
+    private String kuduMaster;
+
+    /**
+     * Specifies the name of the table
+     */
+    private String kuduTableName;
+
+    public enum SaveMode {
+        APPEND(),
+        OVERWRITE();
+
+        public static SaveMode fromStr(String str) {
+            if ("overwrite".equals(str)) {
+                return OVERWRITE;
+            } else {
+                return APPEND;
+            }
+        }
+    }
+
+    public KuduSinkConfig(@NonNull Config pluginConfig) {
+        if (pluginConfig.hasPath(KUDU_SAVE_MODE) && pluginConfig.hasPath(KUDU_MASTER) && pluginConfig.hasPath(KUDU_TABLE_NAME)) {
+            this.saveMode = StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE)) ? SaveMode.APPEND : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE));
+            this.kuduMaster = pluginConfig.getString(KUDU_MASTER);
+            this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME);
+        } else {
+            throw new RuntimeException("Missing Sink configuration parameters");
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
new file mode 100644
index 000000000..754b53114
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import java.io.Serializable;
+
+public class KuduSourceConfig implements Serializable {
+
+    public static final String KUDUMASTER = "kudu_master";
+    public static final String TABLENAME = "kudu_table";
+    public static final String COLUMNSLIST = "columnsList";
+
+
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
new file mode 100644
index 000000000..d00cc2bd3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.ExceptionUtil;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat implements Serializable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduInputFormat.class);
+
+    public KuduInputFormat(String kuduMaster, String tableName, String columnsList) {
+        this.kuduMaster = kuduMaster;
+        this.columnsList = Arrays.asList(columnsList.split(","));
+        this.tableName = tableName;
+
+    }
+
+    /**
+     * Declare the global variable KuduClient and use it to manipulate the Kudu table
+     */
+    public KuduClient kuduClient;
+
+    /**
+     * Specify kuduMaster address
+     */
+    public String kuduMaster;
+    public List<String> columnsList;
+    public Schema schema;
+    public String keyColumn;
+    public static final int TIMEOUTMS = 18000;
+
+    /**
+     * Specifies the name of the table
+     */
+    public String tableName;
+
+    public List<ColumnSchema> getColumnsSchemas() {
+        List<ColumnSchema> columns = null;
+        try {
+            schema = kuduClient.openTable(tableName).getSchema();
+            keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
+            columns = schema.getColumns();
+        } catch (KuduException e) {
+            LOGGER.warn("get table Columns Schemas Fail.", e);
+            throw new RuntimeException("get table Columns Schemas Fail..", e);
+        }
+        return columns;
+    }
+
+    public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {
+
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+        for (int i = 0; i < seaTunnelDataTypes.length; i++) {
+            Object seatunnelField;
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+            if (null == rs.getObject(i)) {
+                seatunnelField = null;
+            } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getBoolean(i);
+            } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getByte(i);
+            } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getShort(i);
+            } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getInt(i);
+            } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getLong(i);
+            } else if (seaTunnelDataType instanceof DecimalType) {
+                Object value = rs.getObject(i);
+                seatunnelField = value instanceof BigInteger ?
+                        new BigDecimal((BigInteger) value, 0)
+                        : value;
+            } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getFloat(i);
+            } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getDouble(i);
+            } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getString(i);
+            } else {
+                throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+            }
+            fields.add(seatunnelField);
+        }
+
+        return new SeaTunnelRow(fields.toArray());
+    }
+
+    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+
+            for (int i = 0; i < columnSchemaList.size(); i++) {
+                fieldNames.add(columnSchemaList.get(i).getName());
+                seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+            }
+        } catch (Exception e) {
+            LOGGER.warn("get row type info exception.", e);
+            throw new PrepareFailException("kudu", PluginType.SOURCE, ExceptionUtil.getMessage(e));
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+    }
+
+    public void openInputFormat() {
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
+
+        kuduClient = kuduClientBuilder.build();
+
+        LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+
+    }
+
+    /**
+     * @param lowerBound The beginning of each slice
+     * @param upperBound End of each slice
+     * @return Get the kuduScanner object for each slice
+     */
+
+    public KuduScanner getKuduBuildSplit(int lowerBound, int upperBound) {
+        KuduScanner kuduScanner = null;
+        try {
+            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+                    kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+
+            kuduScannerBuilder.setProjectedColumnNames(columnsList);
+
+            KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
+                    schema.getColumn("" + keyColumn),
+                    KuduPredicate.ComparisonOp.GREATER_EQUAL,
+                    lowerBound);
+
+            KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
+                    schema.getColumn("" + keyColumn),
+                    KuduPredicate.ComparisonOp.LESS,
+                    upperBound);
+
+            kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
+                    .addPredicate(upperPred).build();
+        } catch (KuduException e) {
+            LOGGER.warn("get the Kuduscan object for each splice exception", e);
+            throw new RuntimeException("get the Kuduscan object for each splice exception.", e);
+        }
+        return kuduScanner;
+    }
+
+    public void closeInputFormat() {
+        if (kuduClient != null) {
+            try {
+                kuduClient.close();
+            } catch (KuduException e) {
+                LOGGER.warn("Kudu Client close failed.", e);
+                throw new RuntimeException("Kudu Client close failed.", e);
+            } finally {
+                kuduClient = null;
+            }
+        }
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
new file mode 100644
index 000000000..f73b191ca
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+
+/**
+ * A Kudu outputFormat
+ */
+public class KuduOutputFormat
+        implements Serializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
+
+    private String kuduMaster;
+    private String kuduTableName;
+    private KuduClient kuduClient;
+    private KuduSession kuduSession;
+    private KuduTable kuduTable;
+    public static final long TIMEOUTMS = 18000;
+    public static final long SESSIONTIMEOUTMS = 100000;
+    public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
+        this.kuduMaster = kuduSinkConfig.getKuduMaster();
+        this.kuduTableName = kuduSinkConfig.getKuduTableName();
+        init();
+    }
+
+    public void write(SeaTunnelRow element) {
+
+        Insert insert = kuduTable.newInsert();
+        Schema schema = kuduTable.getSchema();
+
+        int columnCount = schema.getColumnCount();
+        PartialRow row = insert.getRow();
+        for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
+            ColumnSchema col = schema.getColumnByIndex(columnIndex);
+            try {
+                switch (col.getType()) {
+                    case BOOL:
+                        row.addBoolean(columnIndex, (Boolean) element.getField(columnIndex));
+                        break;
+                    case INT8:
+                        row.addByte(columnIndex, (Byte) element.getField(columnIndex));
+                        break;
+                    case INT16:
+                        row.addShort(columnIndex, (Short) element.getField(columnIndex));
+                        break;
+                    case INT32:
+                        row.addInt(columnIndex, (Integer) element.getField(columnIndex));
+                        break;
+                    case INT64:
+                        row.addLong(columnIndex, (Long) element.getField(columnIndex));
+                        break;
+                    case UNIXTIME_MICROS:
+                        if (element.getField(columnIndex) instanceof Timestamp) {
+                            row.addTimestamp(columnIndex, (Timestamp) element.getField(columnIndex));
+                        } else {
+                            row.addLong(columnIndex, (Long) element.getField(columnIndex));
+                        }
+                        break;
+                    case FLOAT:
+                        row.addFloat(columnIndex, (Float) element.getField(columnIndex));
+                        break;
+                    case DOUBLE:
+                        row.addDouble(columnIndex, (Double) element.getField(columnIndex));
+                        break;
+                    case STRING:
+                        row.addString(columnIndex, element.getField(columnIndex).toString());
+                        break;
+                    case BINARY:
+                        if (element.getField(columnIndex) instanceof byte[]) {
+                            row.addBinary(columnIndex, (byte[]) element.getField(columnIndex));
+                        } else {
+                            row.addBinary(columnIndex, (ByteBuffer) element.getField(columnIndex));
+                        }
+                        break;
+                    case DECIMAL:
+                        row.addDecimal(columnIndex, (BigDecimal) element.getField(columnIndex));
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unsupported column type: " + col.getType());
+                }
+            } catch (ClassCastException e) {
+                e.printStackTrace();
+                throw new IllegalArgumentException(
+                        "Value type does not match column type " + col.getType() +
+                                " for column " + col.getName());
+            }
+
+        }
+
+        try {
+            kuduSession.apply(insert);
+        } catch (KuduException e) {
+            LOGGER.warn("kudu session insert data fail.", e);
+            throw new RuntimeException("kudu session insert data fail.", e);
+        }
+
+    }
+
+    public void init() {
+        KuduClient.KuduClientBuilder kuduClientBuilder = new
+                KuduClient.KuduClientBuilder(kuduMaster);
+        kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
+        this.kuduClient = kuduClientBuilder.build();
+        this.kuduSession = kuduClient.newSession();
+        this.kuduSession.setTimeoutMillis(SESSIONTIMEOUTMS);
+        this.kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+        try {
+            kuduTable = kuduClient.openTable(kuduTableName);
+        } catch (KuduException e) {
+            LOGGER.warn("Failed to initialize the Kudu client.", e);
+            throw new RuntimeException("Failed to initialize the Kudu client.", e);
+        }
+        LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+    }
+
+    public void closeOutputFormat() {
+        if (kuduClient != null) {
+            try {
+                kuduClient.close();
+                kuduSession.close();
+            } catch (KuduException e) {
+                LOGGER.warn("Kudu Client close failed.", e);
+            } finally {
+                kuduClient = null;
+                kuduSession = null;
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
new file mode 100644
index 000000000..cea22dfb2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import org.apache.kudu.ColumnSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class KuduTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduTypeMapper.class);
+
+    // ============================data types=====================
+
+    private static final String KUDU_UNKNOWN = "UNKNOWN";
+    private static final String KUDU_BIT = "BOOL";
+
+    // -------------------------number----------------------------
+    private static final String KUDU_TINYINT = "INT8";
+    private static final String KUDU_MEDIUMINT = "INT32";
+    private static final String KUDU_INT = "INT16";
+    private static final String KUDU_BIGINT = "INT64";
+
+    private static final String KUDU_FLOAT = "FLOAT";
+
+    private static final String KUDU_DOUBLE = "DOUBLE";
+    private static final String KUDU_DECIMAL = "DECIMAL32";
+
+
+    // -------------------------string----------------------------
+
+    private static final String KUDU_VARCHAR = "STRING";
+
+
+    // ------------------------------time-------------------------
+
+    private static final String KUDU_UNIXTIME_MICROS = "UNIXTIME_MICROS";
+
+
+    // ------------------------------blob-------------------------
+
+    private static final String KUDU_BINARY = "BINARY";
+    private static final int PRECISION = 20;
+    public static SeaTunnelDataType<?> mapping(List<ColumnSchema> columnSchemaList, int colIndex) throws SQLException {
+        String kuduType = columnSchemaList.get(colIndex).getType().getName().toUpperCase();
+        switch (kuduType) {
+            case KUDU_BIT:
+                return BasicType.BOOLEAN_TYPE;
+            case KUDU_TINYINT:
+            case KUDU_MEDIUMINT:
+            case KUDU_INT:
+                return BasicType.INT_TYPE;
+            case KUDU_BIGINT:
+                return BasicType.LONG_TYPE;
+            case KUDU_DECIMAL:
+                return new DecimalType(PRECISION, 0);
+            case KUDU_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case KUDU_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+
+            case KUDU_VARCHAR:
+                return BasicType.STRING_TYPE;
+            case KUDU_UNIXTIME_MICROS:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case KUDU_BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+
+            //Doesn't support yet
+
+            case KUDU_UNKNOWN:
+            default:
+                throw new UnsupportedOperationException(
+                    String.format(
+                        "Doesn't support KUDU type '%s' .",
+                        kuduType));
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
new file mode 100644
index 000000000..2f907ae49
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+/**
+ * Kudu Sink implementation by using SeaTunnel sink API.
+ * This class contains the method to create {@link AbstractSimpleSink}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class KuduSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config config;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public String getPluginName() {
+        return "kuduSink";
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.config = pluginConfig;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
+        return new KuduSinkWriter(seaTunnelRowType, config);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
new file mode 100644
index 000000000..d78f48618
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduOutputFormat;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KuduSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSinkWriter.class);
+
+    private SeaTunnelRowType seaTunnelRowType;
+    private Config pluginConfig;
+
+
+    private KuduOutputFormat fileWriter;
+
+    private KuduSinkConfig kuduSinkConfig;
+
+    public KuduSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
+                          @NonNull Config pluginConfig) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.pluginConfig = pluginConfig;
+
+        kuduSinkConfig = new KuduSinkConfig(this.pluginConfig);
+        fileWriter = new KuduOutputFormat(kuduSinkConfig);
+
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        fileWriter.write(element);
+    }
+
+    @Override
+    public void close() throws IOException {
+        fileWriter.closeOutputFormat();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
new file mode 100644
index 000000000..65b004a6a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@AutoService(SeaTunnelSource.class)
+public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSourceState> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
+
+    private SeaTunnelContext seaTunnelContext;
+    private SeaTunnelRowType rowTypeInfo;
+    private KuduInputFormat kuduInputFormat;
+    private PartitionParameter partitionParameter;
+    public static final int TIMEOUTMS = 18000;
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return this.rowTypeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, KuduSourceSplit> createReader(SourceReader.Context readerContext) {
+        return new KuduSourceReader(kuduInputFormat, readerContext);
+    }
+
+    @Override
+    public Serializer<KuduSourceSplit> getSplitSerializer() {
+        return SeaTunnelSource.super.getSplitSerializer();
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> createEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext) {
+        return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter);
+    }
+
+    @Override
+    public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> restoreEnumerator(
+            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSourceState checkpointState) {
+        // todo:
+        return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter);
+    }
+
+    @Override
+    public Serializer<KuduSourceState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "KuduSource";
+    }
+
+    @Override
+    public void prepare(Config config) {
+        String kudumaster = "";
+        String tableName = "";
+        String columnslist = "";
+        if (config.hasPath(KuduSourceConfig.KUDUMASTER) && config.hasPath(KuduSourceConfig.KUDUMASTER) && config.hasPath(KuduSourceConfig.KUDUMASTER)) {
+            kudumaster = config.getString(KuduSourceConfig.KUDUMASTER);
+            tableName = config.getString(KuduSourceConfig.TABLENAME);
+            columnslist = config.getString(KuduSourceConfig.COLUMNSLIST);
+            kuduInputFormat = new KuduInputFormat(kudumaster, tableName, columnslist);
+        } else {
+            throw new RuntimeException("Missing Source configuration parameters");
+        }
+        try {
+            KuduClient.KuduClientBuilder kuduClientBuilder = new
+                    KuduClient.KuduClientBuilder(kudumaster);
+            kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
+
+            KuduClient kuduClient = kuduClientBuilder.build();
+            partitionParameter = initPartitionParameter(kuduClient, tableName);
+            SeaTunnelRowType seaTunnelRowType = getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
+            rowTypeInfo = seaTunnelRowType;
+        } catch (KuduException e) {
+            throw new RuntimeException("Parameters in the preparation phase fail to be generated", e);
+        }
+    }
+
+    private PartitionParameter initPartitionParameter(KuduClient kuduClient, String tableName) {
+        String keyColumn = null;
+        int maxKey = 0;
+        int minKey = 0;
+        boolean flag = true;
+        try {
+            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+                    kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+            ArrayList<String> columnsList = new ArrayList<String>();
+            keyColumn = kuduClient.openTable(tableName).getSchema().getPrimaryKeyColumns().get(0).getName();
+            columnsList.add("" + keyColumn);
+            kuduScannerBuilder.setProjectedColumnNames(columnsList);
+            KuduScanner kuduScanner = kuduScannerBuilder.build();
+            while (kuduScanner.hasMoreRows()) {
+                RowResultIterator rowResults = kuduScanner.nextRows();
+                while (rowResults.hasNext()) {
+                    RowResult row = rowResults.next();
+                    int id = row.getInt("" + keyColumn);
+                    if (flag) {
+                        maxKey = id;
+                        minKey = id;
+                        flag = false;
+                    } else {
+                        if (id >= maxKey) {
+                            maxKey = id;
+                        }
+                        if (id <= minKey) {
+                            minKey = id;
+                        }
+                    }
+                }
+            }
+        } catch (KuduException e) {
+            throw new RuntimeException("Failed to generate upper and lower limits for each partition", e);
+        }
+        return new PartitionParameter(keyColumn, Long.parseLong(minKey + ""), Long.parseLong(maxKey + ""));
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+
+            for (int i = 0; i < columnSchemaList.size(); i++) {
+                fieldNames.add(columnSchemaList.get(i).getName());
+                seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+            }
+
+        } catch (Exception e) {
+            LOGGER.warn("get row type info exception", e);
+            throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString());
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
new file mode 100644
index 000000000..e3c3cd25d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class KuduSourceReader implements SourceReader<SeaTunnelRow, KuduSourceSplit> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSourceReader.class);
+
+    private final SourceReader.Context context;
+
+    private final KuduInputFormat kuduInputFormat;
+    Deque<KuduSourceSplit> splits = new LinkedList<>();
+
+    boolean noMoreSplit;
+
+    public KuduSourceReader(KuduInputFormat kuduInputFormat, SourceReader.Context context) {
+        this.context = context;
+        this.kuduInputFormat = kuduInputFormat;
+    }
+
+    @Override
+    public void open() {
+        kuduInputFormat.openInputFormat();
+    }
+
+    @Override
+    public void close() {
+        kuduInputFormat.closeInputFormat();
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        KuduSourceSplit split = splits.poll();
+        Object[] parameterValues = split.parameterValues;
+        int lowerBound = Integer.parseInt(parameterValues[0].toString());
+        int upperBound = Integer.parseInt(parameterValues[1].toString());
+        List<ColumnSchema> columnSchemaList = kuduInputFormat.getColumnsSchemas();
+        KuduScanner kuduScanner = kuduInputFormat.getKuduBuildSplit(lowerBound, upperBound);
+        //
+        while (kuduScanner.hasMoreRows()) {
+            RowResultIterator rowResults = kuduScanner.nextRows();
+            while (rowResults.hasNext()) {
+                RowResult rowResult = rowResults.next();
+                SeaTunnelRow seaTunnelRow = KuduInputFormat.getSeaTunnelRowData(rowResult, kuduInputFormat.getSeaTunnelRowType(columnSchemaList));
+                output.collect(seaTunnelRow);
+            }
+        }
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            LOGGER.info("Closed the bounded fake source");
+            context.signalNoMoreElement();
+        }
+
+    }
+
+    @Override
+    public List<KuduSourceSplit> snapshotState(long checkpointId) {
+        return null;
+    }
+
+    @Override
+    public void addSplits(List<KuduSourceSplit> splits) {
+        this.splits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java
new file mode 100644
index 000000000..4f1320f73
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class KuduSourceSplit implements SourceSplit {
+
+    private static final long serialVersionUID = -1L;
+
+    Object[] parameterValues;
+    public final Integer splitId;
+
+    @Override
+    public String splitId() {
+        return splitId.toString();
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
new file mode 100644
index 000000000..a2e54fef6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class KuduSourceSplitEnumerator implements SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> {
+
+    private final SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext;
+    private PartitionParameter partitionParameter;
+    List<KuduSourceSplit> allSplit = new ArrayList<>();
+    private Long maxVal;
+    private Long minVal;
+    private Long batchSize;
+    private Integer batchNum;
+
+    public KuduSourceSplitEnumerator(SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, PartitionParameter partitionParameter) {
+        this.enumeratorContext = enumeratorContext;
+        this.partitionParameter = partitionParameter;
+    }
+
+    @Override
+    public void open() {
+
+    }
+
+    @Override
+    public void run() {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        int parallelism = enumeratorContext.currentParallelism();
+        if (allSplit.isEmpty()) {
+            if (null != partitionParameter) {
+                Serializable[][] parameterValues = getParameterValues(partitionParameter.minValue, partitionParameter.maxValue, parallelism);
+                for (int i = 0; i < parameterValues.length; i++) {
+                    allSplit.add(new KuduSourceSplit(parameterValues[i], i));
+                }
+            } else {
+                allSplit.add(new KuduSourceSplit(null, 0));
+            }
+        }
+        // Filter the split that the current task needs to run
+        List<KuduSourceSplit> splits = allSplit.stream().filter(p -> p.splitId % parallelism == subtaskId).collect(Collectors.toList());
+        enumeratorContext.assignSplit(subtaskId, splits);
+        enumeratorContext.signalNoMoreSplits(subtaskId);
+    }
+
+    private Serializable[][] getParameterValues(Long minVal, Long maxVal, int parallelism) {
+        this.maxVal = maxVal;
+        this.minVal = minVal;
+        long maxElemCount = (maxVal - minVal) + 1;
+        batchNum = parallelism;
+        getBatchSizeAndBatchNum(parallelism);
+        long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+
+        Serializable[][] parameters = new Serializable[batchNum][2];
+        long start = minVal;
+        for (int i = 0; i < batchNum; i++) {
+            long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
+            parameters[i] = new Long[]{start, end};
+            start = end + 1;
+        }
+        return parameters;
+
+    }
+
+    private void getBatchSizeAndBatchNum(int parallelism) {
+        batchNum = parallelism;
+        long maxElemCount = (maxVal - minVal) + 1;
+        if (batchNum > maxElemCount) {
+            batchNum = (int) maxElemCount;
+        }
+        this.batchNum = batchNum;
+        this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue();
+    }
+
+    @Override
+    public KuduSourceState snapshotState(long checkpointId) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java
new file mode 100644
index 000000000..e79116466
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class PartitionParameter implements Serializable {
+
+    String partitionColumnName;
+    Long minValue;
+    Long maxValue;
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java
new file mode 100644
index 000000000..317bf375f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.state;
+
+import java.io.Serializable;
+
+public class KuduSourceState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_flink.conf b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_flink.conf
new file mode 100644
index 000000000..b04aeae68
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_flink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 2
+  #job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    KuduSource {
+      result_table_name = "studentlyh2"
+      kudu_master = "192.168.88.110:7051"
+      kudu_table = "studentlyh2"
+      columnsList = "id,name,age,sex"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select id,name,age,sex from studentlyh2"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ kuduSink {
+      kudu_master = "192.168.88.110:7051"
+      kudu_table = "studentlyhresultflink"
+      save_mode="append"
+   }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_spark.conf b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_spark.conf
new file mode 100644
index 000000000..cb4ecbaa8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_spark.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 2
+    spark.executor.memory = "1g"
+    spark.master = local
+  #job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    KuduSource {
+      result_table_name = "studentlyh2"
+      kudu_master = "192.168.88.110:7051"
+      kudu_table = "studentlyh2"
+      columnsList = "id,name,age,sex"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select id,name,age,sex from studentlyh2"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+    kuduSink {
+        kudu_master = "192.168.88.110:7051"
+        kudu_table = "studentlyhresult"
+        save_mode="append"
+     }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index b5ed52de7..60fd7db0e 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -44,6 +44,7 @@
         <module>connector-file</module>
         <module>connector-hudi</module>
         <module>connector-assert</module>
+        <module>connector-kudu</module>
         <module>connector-email</module>
         <module>connector-dingtalk</module>
     </modules>