You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/25 14:03:04 UTC

[seatunnel] branch dev updated: [Feature][Connector-v2] Add Snowflake Source&Sink connector (#4470)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 06c59a25f [Feature][Connector-v2] Add Snowflake Source&Sink connector (#4470)
06c59a25f is described below

commit 06c59a25f3e8d49841fa49aa8ca8b8254e135080
Author: Hao Xu <sd...@gmail.com>
AuthorDate: Thu May 25 07:02:56 2023 -0700

    [Feature][Connector-v2] Add Snowflake Source&Sink connector (#4470)
    
    
    ---------
    
    Co-authored-by: Eric <ga...@gmail.com>
    Co-authored-by: hailin0 <wa...@apache.org>
---
 docs/en/connector-v2/sink/Jdbc.md                  |   1 +
 docs/en/connector-v2/sink/Snowflake.md             | 144 +++++++++++++++
 docs/en/connector-v2/source/Jdbc.md                |   1 +
 docs/en/connector-v2/source/Snowflake.md           | 153 ++++++++++++++++
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  11 ++
 .../snowflake/SnowflakeDataTypeConvertor.java      | 201 +++++++++++++++++++++
 .../dialect/snowflake/SnowflakeDialect.java        |  47 +++++
 .../dialect/snowflake/SnowflakeDialectFactory.java |  37 ++++
 .../snowflake/SnowflakeJdbcRowConverter.java       |  27 +++
 .../dialect/snowflake/SnowflakeTypeMapper.java     | 147 +++++++++++++++
 .../catalog/SnowflakeDataTypeConvertorTest.java    |  52 ++++++
 seatunnel-dist/pom.xml                             |  11 +-
 .../src/main/assembly/assembly-bin-ci.xml          |   1 +
 .../connector-jdbc-e2e-part-3/pom.xml              |   5 +
 .../connectors/seatunnel/jdbc/JdbcSnowflakeIT.java |  79 ++++++++
 .../resources/jdbc_snowflake_source_and_sink.conf  |  76 ++++++++
 16 files changed, 991 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index 10f2a3377..50ccaf5cb 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -166,6 +166,7 @@ there are some reference value for params above.
 | Doris      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                   | /                                                  | https://mvnrepository.com/artifact/mysql/mysql-connector-java                                               |
 | teradata   | com.teradata.jdbc.TeraDriver                 | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test              | /                                                  | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                                               |
 | Redshift   | com.amazon.redshift.jdbc42.Driver            | jdbc:redshift://localhost:5439/testdb                              | com.amazon.redshift.xa.RedshiftXADataSource        | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42                                      |
+| Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver    | jdbc:snowflake://<account_name>.snowflakecomputing.com             | /                                                  | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                                             |
 | Vertica    | com.vertica.jdbc.Driver                      | jdbc:vertica://localhost:5433                                      | /                                                  | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar             |
 
 ## Example
diff --git a/docs/en/connector-v2/sink/Snowflake.md b/docs/en/connector-v2/sink/Snowflake.md
new file mode 100644
index 000000000..76f234cdc
--- /dev/null
+++ b/docs/en/connector-v2/sink/Snowflake.md
@@ -0,0 +1,144 @@
+# Snowflake
+
+> JDBC Snowflake Sink Connector
+>
+> ## Support those engines
+>
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+>
+  ## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing.
+
+## Supported DataSource list
+
+| datasource |                    supported versions                    |                  driver                   |                          url                           |                                    maven                                    |
+|------------|----------------------------------------------------------|-------------------------------------------|--------------------------------------------------------|-----------------------------------------------------------------------------|
+| snowflake  | Different dependency version has different driver class. | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | [Download](https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc) |
+
+## Database dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example Snowflake datasource: cp snowflake-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+>
+  ## Data Type Mapping
+
+|                             Snowflake Data type                             | Seatunnel Data type |
+|-----------------------------------------------------------------------------|---------------------|
+| BOOLEAN                                                                     | BOOLEAN             |
+| TINYINT<br/>SMALLINT<br/>BYTEINT<br/>                                       | SHORT_TYPE          |
+| INT<br/>INTEGER<br/>                                                        | INT                 |
+| BIGINT                                                                      | LONG                |
+| DECIMAL<br/>NUMERIC<br/>NUMBER<br/>                                         | DECIMAL(x,y)        |
+| DECIMAL(x,y)(Get the designated column's specified column size.>38)         | DECIMAL(38,18)      |
+| REAL<br/>FLOAT4                                                             | FLOAT               |
+| DOUBLE<br/>DOUBLE PRECISION<br/>FLOAT8<br/>FLOAT<br/>                       | DOUBLE              |
+| CHAR<br/>CHARACTER<br/>VARCHAR<br/>STRING<br/>TEXT<br/>VARIANT<br/>OBJECT   | STRING              |
+| DATE                                                                        | DATE                |
+| TIME                                                                        | TIME                |
+| DATETIME<br/>TIMESTAMP<br/>TIMESTAMP_LTZ<br/>TIMESTAMP_NTZ<br/>TIMESTAMP_TZ | TIMESTAMP           |
+| BINARY<br/>VARBINARY<br/>GEOGRAPHY<br/>GEOMETRY                             | BYTES               |
+
+## Options
+
+|                   name                    |  type   | required | default value |                                                                                                                 description                                                                                                                  |
+|-------------------------------------------|---------|----------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                                       | String  | Yes      | -             | The URL of the JDBC connection. Refer to a case: jdbc:snowflake://<account_name>.snowflakecomputing.com                                                                                                                                      |
+| driver                                    | String  | Yes      | -             | The jdbc class name used to connect to the remote data source,<br/> if you use Snowflake the value is `net.snowflake.client.jdbc.SnowflakeDriver`.                                                                                           |
+| user                                      | String  | No       | -             | Connection instance user name                                                                                                                                                                                                                |
+| password                                  | String  | No       | -             | Connection instance password                                                                                                                                                                                                                 |
+| query                                     | String  | No       | -             | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority                                                                                                                                       |
+| database                                  | String  | No       | -             | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.<br/>This option is mutually exclusive with `query` and has a higher priority.                                                     |
+| table                                     | String  | No       | -             | Use database and this table-name auto-generate sql and receive upstream input datas write to database.<br/>This option is mutually exclusive with `query` and has a higher priority.                                                         |
+| primary_keys                              | Array   | No       | -             | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql.                                                                                                                          |
+| support_upsert_by_query_primary_key_exist | Boolean | No       | false         | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance |
+| connection_check_timeout_sec              | Int     | No       | 30            | The time in seconds to wait for the database operation used to validate the connection to complete.                                                                                                                                          |
+| max_retries                               | Int     | No       | 0             | The number of retries to submit failed (executeBatch)                                                                                                                                                                                        |
+| batch_size                                | Int     | No       | 1000          | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be flushed into the database                                                           |
+| batch_interval_ms                         | Int     | No       | 1000          | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database                                                                         |
+| max_commit_attempts                       | Int     | No       | 3             | The number of retries for transaction commit failures                                                                                                                                                                                        |
+| transaction_timeout_sec                   | Int     | No       | -1            | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics                                                                                          |
+| auto_commit                               | Boolean | No       | true          | Automatic transaction commit is enabled by default                                                                                                                                                                                           |
+| common-options                            |         | no       | -             | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details                                                                                                                                          |
+
+## tips
+
+> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed  in parallel according to the concurrency of tasks.
+>
+  ## Task Example
+
+### simple:
+
+> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to JDBC Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target table is test_table will also be 16 rows of data in the table. Before run this job, you need create database test and table test_table in your snowflake database. And if you have not yet installed and depl [...]
+>
+> ```
+> # Defining the runtime environment
+> env {
+> # You can set flink configuration here
+> execution.parallelism = 1
+> job.mode = "BATCH"
+> }
+> source {
+> # This is a example source plugin **only for test and demonstrate the feature source plugin**
+> FakeSource {
+> parallelism = 1
+> result_table_name = "fake"
+> row.num = 16
+> schema = {
+> fields {
+> name = "string"
+> age = "int"
+> }
+> }
+> }
+> # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+> # please go to https://seatunnel.apache.org/docs/category/source-v2
+> }
+> transform {
+> # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+> # please go to https://seatunnel.apache.org/docs/category/transform-v2
+> }
+> sink {
+> jdbc {
+> url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
+> driver = "net.snowflake.client.jdbc.SnowflakeDriver"
+> user = "root"
+> password = "123456"
+> query = "insert into test_table(name,age) values(?,?)"
+> }
+> # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+> # please go to https://seatunnel.apache.org/docs/category/sink-v2
+> }
+> ```
+
+### CDC(Change data capture) event
+
+> CDC change data is also supported by us In this case, you need config database, table and primary_keys.
+>
+> ```
+> jdbc {
+> url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
+> driver = "net.snowflake.client.jdbc.SnowflakeDriver"
+> user = "root"
+> password = "123456"
+>
+> ```
+
+        # You need to configure both database and table
+        database = test
+        table = sink_table
+        primary_keys = ["id","name"]
+
+}
+
+```
+
+```
+
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index 61730863e..a6a81b6f5 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -117,6 +117,7 @@ there are some reference value for params above.
 | saphana    | com.sap.db.jdbc.Driver                              | jdbc:sap://localhost:39015                                             | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc                                              |
 | doris      | com.mysql.cj.jdbc.Driver                            | jdbc:mysql://localhost:3306/test                                       | https://mvnrepository.com/artifact/mysql/mysql-connector-java                                               |
 | teradata   | com.teradata.jdbc.TeraDriver                        | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test                  | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                                               |
+| Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver           | jdbc:snowflake://<account_name>.snowflakecomputing.com                 | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                                             |
 | Redshift   | com.amazon.redshift.jdbc42.Driver                   | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000         | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42                                      |
 | Vertica    | com.vertica.jdbc.Driver                             | jdbc:vertica://localhost:5433                                          | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar             |
 
diff --git a/docs/en/connector-v2/source/Snowflake.md b/docs/en/connector-v2/source/Snowflake.md
new file mode 100644
index 000000000..74d31283a
--- /dev/null
+++ b/docs/en/connector-v2/source/Snowflake.md
@@ -0,0 +1,153 @@
+# Snowflake
+
+> JDBC Snowflake Source Connector
+>
+> ## Support those engines
+>
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+>
+  ## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+> supports query SQL and can achieve projection effect.
+>
+  ## Description
+
+Read external data source data through JDBC.
+
+## Supported DataSource list
+
+| datasource |                    supported versions                    |                  driver                   |                          url                           |                                    maven                                    |
+|------------|----------------------------------------------------------|-------------------------------------------|--------------------------------------------------------|-----------------------------------------------------------------------------|
+| snowflake  | Different dependency version has different driver class. | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | [Download](https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc) |
+
+## Database dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example Snowflake datasource: cp snowflake-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+>
+  ## Data Type Mapping
+
+|                             Snowflake Data type                             | Seatunnel Data type |
+|-----------------------------------------------------------------------------|---------------------|
+| BOOLEAN                                                                     | BOOLEAN             |
+| TINYINT<br/>SMALLINT<br/>BYTEINT<br/>                                       | SHORT_TYPE          |
+| INT<br/>INTEGER<br/>                                                        | INT                 |
+| BIGINT                                                                      | LONG                |
+| DECIMAL<br/>NUMERIC<br/>NUMBER<br/>                                         | DECIMAL(x,y)        |
+| DECIMAL(x,y)(Get the designated column's specified column size.>38)         | DECIMAL(38,18)      |
+| REAL<br/>FLOAT4                                                             | FLOAT               |
+| DOUBLE<br/>DOUBLE PRECISION<br/>FLOAT8<br/>FLOAT<br/>                       | DOUBLE              |
+| CHAR<br/>CHARACTER<br/>VARCHAR<br/>STRING<br/>TEXT<br/>VARIANT<br/>OBJECT   | STRING              |
+| DATE                                                                        | DATE                |
+| TIME                                                                        | TIME                |
+| DATETIME<br/>TIMESTAMP<br/>TIMESTAMP_LTZ<br/>TIMESTAMP_NTZ<br/>TIMESTAMP_TZ | TIMESTAMP           |
+| BINARY<br/>VARBINARY                                                        | BYTES               |
+| GEOGRAPHY (WKB or EWKB)<br/>GEOMETRY (WKB or EWKB)                          | BYTES               |
+| GEOGRAPHY (GeoJSON, WKT or EWKT)<br/>GEOMETRY (GeoJSON, WKB or EWKB)        | STRING              |
+
+## Options
+
+|             name             |  type  | required |     default     |                                                                                                                            description                                                                                                                            |
+|------------------------------|--------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                          | String | Yes      | -               | The URL of the JDBC connection. Refer to a case: jdbc:snowflake://<account_name>.snowflakecomputing.com                                                                                                                                                           |
+| driver                       | String | Yes      | -               | The jdbc class name used to connect to the remote data source,<br/> if you use Snowflake the value is `net.snowflake.client.jdbc.SnowflakeDriver`.                                                                                                                |
+| user                         | String | No       | -               | Connection instance user name                                                                                                                                                                                                                                     |
+| password                     | String | No       | -               | Connection instance password                                                                                                                                                                                                                                      |
+| query                        | String | Yes      | -               | Query statement                                                                                                                                                                                                                                                   |
+| connection_check_timeout_sec | Int    | No       | 30              | The time in seconds to wait for the database operation used to validate the connection to complete                                                                                                                                                                |
+| partition_column             | String | No       | -               | The column name for parallelism's partition, only support numeric type,Only support numeric type primary key, and only can config one column.                                                                                                                     |
+| partition_lower_bound        | Long   | No       | -               | The partition_column min value for scan, if not set SeaTunnel will query database get min value.                                                                                                                                                                  |
+| partition_upper_bound        | Long   | No       | -               | The partition_column max value for scan, if not set SeaTunnel will query database get max value.                                                                                                                                                                  |
+| partition_num                | Int    | No       | job parallelism | The number of partition count, only support positive integer. default value is job parallelism                                                                                                                                                                    |
+| fetch_size                   | Int    | No       | 0               | For queries that return a large number of objects,you can configure<br/> the row fetch size used in the query toimprove performance by<br/> reducing the number database hits required to satisfy the selection criteria.<br/> Zero means use jdbc default value. |
+| common-options               |        | No       | -               | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details                                                                                                                                                           |
+
+## tips
+
+> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed  in parallel according to the concurrency of tasks.
+>
+> JDBC Driver Connection Parameters are supported in JDBC connection string. E.g, you can add `?GEOGRAPHY_OUTPUT_FORMAT='EWKT'` to specify the Geospatial Data Types. For more information about configurable parameters, and geospatial data types please visit Snowflake official [document](https://docs.snowflake.com/en/sql-reference/data-types-geospatial)
+
+## Task Example
+
+### simple:
+
+> This example queries type_bin 'table' 16 data in your test "database" in single parallel and queries all of its fields. You can also specify which fields to query for final output to the console.
+>
+> ```
+> # Defining the runtime environment
+> env {
+> # You can set flink configuration here
+> execution.parallelism = 2
+> job.mode = "BATCH"
+> }
+> source{
+> Jdbc {
+> url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
+> driver = "net.snowflake.client.jdbc.SnowflakeDriver"
+> connection_check_timeout_sec = 100
+> user = "root"
+> password = "123456"
+> query = "select * from type_bin limit 16"
+> }
+> }
+> transform {
+> # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+> # please go to https://seatunnel.apache.org/docs/transform/sql
+> }
+> sink {
+> Console {}
+> }
+> ```
+
+### parallel:
+
+> Read your query table in parallel with the shard field you configured and the shard data  You can do this if you want to read the whole table
+>
+> ```
+> Jdbc {
+> url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
+> driver = "net.snowflake.client.jdbc.SnowflakeDriver"
+> connection_check_timeout_sec = 100
+> user = "root"
+> password = "123456"
+> # Define query logic as required
+> query = "select * from type_bin"
+> # Parallel sharding reads fields
+> partition_column = "id"
+> # Number of fragments
+> partition_num = 10
+> }
+> ```
+
+### parallel boundary:
+
+> It is more efficient to specify the data within the upper and lower bounds of the query It is more efficient to read your data source according to the upper and lower boundaries you configured
+>
+> ```
+> Jdbc {
+> url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
+> driver = "net.snowflake.client.jdbc.SnowflakeDriver"
+> connection_check_timeout_sec = 100
+> user = "root"
+> password = "123456"
+> # Define query logic as required
+> query = "select * from type_bin"
+> partition_column = "id"
+> # Read start boundary
+> partition_lower_bound = 1
+> # Read end boundary
+> partition_upper_bound = 500
+> partition_num = 10
+> }
+> ```
+
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index edeb4ccde..7b451df12 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -43,6 +43,7 @@
         <teradata.version>17.20.00.12</teradata.version>
         <redshift.version>2.1.0.9</redshift.version>
         <saphana.version>2.14.7</saphana.version>
+        <snowflake.version>3.13.29</snowflake.version>
         <vertica.version>12.0.3-0</vertica.version>
         <postgis.jdbc.version>2.5.1</postgis.jdbc.version>
     </properties>
@@ -130,6 +131,12 @@
                 <version>${saphana.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>net.snowflake</groupId>
+                <artifactId>snowflake-jdbc</artifactId>
+                <version>${snowflake.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <dependency>
                 <groupId>com.vertica.jdbc</groupId>
                 <artifactId>vertica-jdbc</artifactId>
@@ -203,6 +210,10 @@
             <artifactId>ngdbc</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>net.snowflake</groupId>
+            <artifactId>snowflake-jdbc</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.vertica.jdbc</groupId>
             <artifactId>vertica-jdbc</artifactId>
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
new file mode 100644
index 000000000..4a808e9d3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/snowflake/SnowflakeDataTypeConvertor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.snowflake;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.apache.commons.collections4.MapUtils;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class SnowflakeDataTypeConvertor implements DataTypeConvertor<String> {
+
+    public static final String PRECISION = "precision";
+    public static final String SCALE = "scale";
+    public static final Integer DEFAULT_PRECISION = 10;
+    public static final Integer DEFAULT_SCALE = 0;
+
+    /* ============================ data types ===================== */
+    private static final String SNOWFLAKE_NUMBER = "NUMBER";
+    private static final String SNOWFLAKE_DECIMAL = "DECIMAL";
+    private static final String SNOWFLAKE_NUMERIC = "NUMERIC";
+    private static final String SNOWFLAKE_INT = "INT";
+    private static final String SNOWFLAKE_INTEGER = "INTEGER";
+    private static final String SNOWFLAKE_BIGINT = "BIGINT";
+    private static final String SNOWFLAKE_SMALLINT = "SMALLINT";
+    private static final String SNOWFLAKE_TINYINT = "TINYINT";
+    private static final String SNOWFLAKE_BYTEINT = "BYTEINT";
+
+    private static final String SNOWFLAKE_FLOAT = "FLOAT";
+    private static final String SNOWFLAKE_FLOAT4 = "FLOAT4";
+    private static final String SNOWFLAKE_FLOAT8 = "FLOAT8";
+    private static final String SNOWFLAKE_DOUBLE = "DOUBLE";
+    private static final String SNOWFLAKE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String SNOWFLAKE_REAL = "REAL";
+
+    private static final String SNOWFLAKE_VARCHAR = "VARCHAR";
+    private static final String SNOWFLAKE_CHAR = "CHAR";
+    private static final String SNOWFLAKE_CHARACTER = "CHARACTER";
+    private static final String SNOWFLAKE_STRING = "STRING";
+    private static final String SNOWFLAKE_TEXT = "TEXT";
+    private static final String SNOWFLAKE_BINARY = "BINARY";
+    private static final String SNOWFLAKE_VARBINARY = "VARBINARY";
+
+    private static final String SNOWFLAKE_BOOLEAN = "BOOLEAN";
+
+    private static final String SNOWFLAKE_DATE = "DATE";
+    private static final String SNOWFLAKE_DATE_TIME = "DATE_TIME";
+    private static final String SNOWFLAKE_TIME = "TIME";
+    private static final String SNOWFLAKE_TIMESTAMP = "TIMESTAMP";
+    private static final String SNOWFLAKE_TIMESTAMP_LTZ = "TIMESTAMP_LTZ";
+    private static final String SNOWFLAKE_TIMESTAMP_NTZ = "TIMESTAMP_NTZ";
+    private static final String SNOWFLAKE_TIMESTAMP_TZ = "TIMESTAMP_TZ";
+
+    private static final String SNOWFLAKE_GEOGRAPHY = "GEOGRAPHY";
+    private static final String SNOWFLAKE_GEOMETRY = "GEOMETRY";
+
+    private static final String SNOWFLAKE_VARIANT = "VARIANT";
+    private static final String SNOWFLAKE_OBJECT = "OBJECT";
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            String connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        checkNotNull(connectorDataType, "redshiftType cannot be null");
+
+        switch (connectorDataType) {
+            case SNOWFLAKE_SMALLINT:
+            case SNOWFLAKE_TINYINT:
+            case SNOWFLAKE_BYTEINT:
+                return BasicType.SHORT_TYPE;
+            case SNOWFLAKE_INTEGER:
+            case SNOWFLAKE_INT:
+                return BasicType.INT_TYPE;
+            case SNOWFLAKE_BIGINT:
+                return BasicType.LONG_TYPE;
+            case SNOWFLAKE_DECIMAL:
+            case SNOWFLAKE_NUMERIC:
+            case SNOWFLAKE_NUMBER:
+                Integer precision =
+                        MapUtils.getInteger(dataTypeProperties, PRECISION, DEFAULT_PRECISION);
+                Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE, DEFAULT_SCALE);
+                return new DecimalType(precision, scale);
+            case SNOWFLAKE_REAL:
+            case SNOWFLAKE_FLOAT4:
+                return BasicType.FLOAT_TYPE;
+            case SNOWFLAKE_DOUBLE:
+            case SNOWFLAKE_DOUBLE_PRECISION:
+            case SNOWFLAKE_FLOAT8:
+            case SNOWFLAKE_FLOAT:
+                return BasicType.DOUBLE_TYPE;
+            case SNOWFLAKE_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case SNOWFLAKE_CHAR:
+            case SNOWFLAKE_CHARACTER:
+            case SNOWFLAKE_VARCHAR:
+            case SNOWFLAKE_STRING:
+            case SNOWFLAKE_TEXT:
+            case SNOWFLAKE_VARIANT:
+            case SNOWFLAKE_OBJECT:
+            case SNOWFLAKE_GEOMETRY:
+                return BasicType.STRING_TYPE;
+            case SNOWFLAKE_BINARY:
+            case SNOWFLAKE_VARBINARY:
+            case SNOWFLAKE_GEOGRAPHY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case SNOWFLAKE_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case SNOWFLAKE_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case SNOWFLAKE_DATE_TIME:
+            case SNOWFLAKE_TIMESTAMP:
+            case SNOWFLAKE_TIMESTAMP_LTZ:
+            case SNOWFLAKE_TIMESTAMP_NTZ:
+            case SNOWFLAKE_TIMESTAMP_TZ:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support SNOWFLAKE type '%s' yet.", connectorDataType));
+        }
+    }
+
+    @Override
+    public String toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+
+        switch (sqlType) {
+            case TINYINT:
+            case SMALLINT:
+                return SNOWFLAKE_SMALLINT;
+            case INT:
+                return SNOWFLAKE_INTEGER;
+            case BIGINT:
+                return SNOWFLAKE_BIGINT;
+            case DECIMAL:
+                return SNOWFLAKE_DECIMAL;
+            case FLOAT:
+                return SNOWFLAKE_FLOAT4;
+            case DOUBLE:
+                return SNOWFLAKE_DOUBLE_PRECISION;
+            case BOOLEAN:
+                return SNOWFLAKE_BOOLEAN;
+            case STRING:
+                return SNOWFLAKE_TEXT;
+            case DATE:
+                return SNOWFLAKE_DATE;
+            case BYTES:
+                return SNOWFLAKE_GEOMETRY;
+            case TIME:
+                return SNOWFLAKE_TIME;
+            case TIMESTAMP:
+                return SNOWFLAKE_TIMESTAMP;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support SeaTunnel type '%s''  yet.", seaTunnelDataType));
+        }
+    }
+
+    @Override
+    public String getIdentity() {
+        return "SNOWFLAKE";
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialect.java
new file mode 100644
index 000000000..e3f071825
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialect.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Optional;
+
+public class SnowflakeDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "Snowflake";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new SnowflakeJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new SnowflakeTypeMapper();
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        return Optional.empty();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialectFactory.java
new file mode 100644
index 000000000..0b30c6e4f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialectFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link SnowflakeDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class SnowflakeDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:snowflake:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new SnowflakeDialect();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeJdbcRowConverter.java
new file mode 100644
index 000000000..3ab4994c7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeJdbcRowConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+public class SnowflakeJdbcRowConverter extends AbstractJdbcRowConverter {
+    @Override
+    public String converterName() {
+        return "Snowflake";
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeTypeMapper.java
new file mode 100644
index 000000000..c7f875de3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeTypeMapper.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+
+@Slf4j
+public class SnowflakeTypeMapper implements JdbcDialectTypeMapper {
+
+    /* ============================ data types ===================== */
+    private static final String SNOWFLAKE_NUMBER = "NUMBER";
+    private static final String SNOWFLAKE_DECIMAL = "DECIMAL";
+    private static final String SNOWFLAKE_NUMERIC = "NUMERIC";
+    private static final String SNOWFLAKE_INT = "INT";
+    private static final String SNOWFLAKE_INTEGER = "INTEGER";
+    private static final String SNOWFLAKE_BIGINT = "BIGINT";
+    private static final String SNOWFLAKE_SMALLINT = "SMALLINT";
+    private static final String SNOWFLAKE_TINYINT = "TINYINT";
+    private static final String SNOWFLAKE_BYTEINT = "BYTEINT";
+
+    private static final String SNOWFLAKE_FLOAT = "FLOAT";
+    private static final String SNOWFLAKE_FLOAT4 = "FLOAT4";
+    private static final String SNOWFLAKE_FLOAT8 = "FLOAT8";
+    private static final String SNOWFLAKE_DOUBLE = "DOUBLE";
+    private static final String SNOWFLAKE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String SNOWFLAKE_REAL = "REAL";
+
+    private static final String SNOWFLAKE_VARCHAR = "VARCHAR";
+    private static final String SNOWFLAKE_CHAR = "CHAR";
+    private static final String SNOWFLAKE_CHARACTER = "CHARACTER";
+    private static final String SNOWFLAKE_STRING = "STRING";
+    private static final String SNOWFLAKE_TEXT = "TEXT";
+    private static final String SNOWFLAKE_BINARY = "BINARY";
+    private static final String SNOWFLAKE_VARBINARY = "VARBINARY";
+
+    private static final String SNOWFLAKE_BOOLEAN = "BOOLEAN";
+
+    private static final String SNOWFLAKE_DATE = "DATE";
+    private static final String SNOWFLAKE_DATE_TIME = "DATE_TIME";
+    private static final String SNOWFLAKE_TIME = "TIME";
+    private static final String SNOWFLAKE_TIMESTAMP = "TIMESTAMP";
+    private static final String SNOWFLAKE_TIMESTAMP_LTZ = "TIMESTAMPLTZ";
+    private static final String SNOWFLAKE_TIMESTAMP_NTZ = "TIMESTAMPNTZ";
+    private static final String SNOWFLAKE_TIMESTAMP_TZ = "TIMESTAMPTZ";
+
+    private static final String SNOWFLAKE_GEOGRAPHY = "GEOGRAPHY";
+    private static final String SNOWFLAKE_GEOMETRY = "GEOMETRY";
+
+    private static final String SNOWFLAKE_VARIANT = "VARIANT";
+    private static final String SNOWFLAKE_OBJECT = "OBJECT";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String snowflakeType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        switch (snowflakeType) {
+            case SNOWFLAKE_SMALLINT:
+            case SNOWFLAKE_TINYINT:
+            case SNOWFLAKE_BYTEINT:
+                return BasicType.SHORT_TYPE;
+            case SNOWFLAKE_INTEGER:
+            case SNOWFLAKE_INT:
+                return BasicType.INT_TYPE;
+            case SNOWFLAKE_BIGINT:
+                return BasicType.LONG_TYPE;
+            case SNOWFLAKE_DECIMAL:
+            case SNOWFLAKE_NUMERIC:
+            case SNOWFLAKE_NUMBER:
+                return new DecimalType(precision, scale);
+            case SNOWFLAKE_REAL:
+            case SNOWFLAKE_FLOAT4:
+                return BasicType.FLOAT_TYPE;
+            case SNOWFLAKE_DOUBLE:
+            case SNOWFLAKE_DOUBLE_PRECISION:
+            case SNOWFLAKE_FLOAT8:
+            case SNOWFLAKE_FLOAT:
+                return BasicType.DOUBLE_TYPE;
+            case SNOWFLAKE_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case SNOWFLAKE_CHAR:
+            case SNOWFLAKE_CHARACTER:
+            case SNOWFLAKE_VARCHAR:
+            case SNOWFLAKE_STRING:
+            case SNOWFLAKE_TEXT:
+            case SNOWFLAKE_VARIANT:
+            case SNOWFLAKE_OBJECT:
+                return BasicType.STRING_TYPE;
+            case SNOWFLAKE_GEOGRAPHY:
+            case SNOWFLAKE_GEOMETRY:
+                int geoMetaType = metadata.getColumnType(colIndex);
+                switch (geoMetaType) {
+                    case Types.BINARY:
+                        return PrimitiveByteArrayType.INSTANCE;
+                    case Types.VARCHAR:
+                    default:
+                        return BasicType.STRING_TYPE;
+                }
+            case SNOWFLAKE_BINARY:
+            case SNOWFLAKE_VARBINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case SNOWFLAKE_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case SNOWFLAKE_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case SNOWFLAKE_DATE_TIME:
+            case SNOWFLAKE_TIMESTAMP:
+            case SNOWFLAKE_TIMESTAMP_LTZ:
+            case SNOWFLAKE_TIMESTAMP_NTZ:
+            case SNOWFLAKE_TIMESTAMP_TZ:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support SNOWFLAKE type '%s' on column '%s'  yet.",
+                                snowflakeType, jdbcColumnName));
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
new file mode 100644
index 000000000..e0e8cef8c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/SnowflakeDataTypeConvertorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.snowflake.SnowflakeDataTypeConvertor;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import net.snowflake.client.jdbc.SnowflakeType;
+
+import java.util.Collections;
+
+public class SnowflakeDataTypeConvertorTest {
+    private final SnowflakeDataTypeConvertor snowflakeDataTypeConvertor =
+            new SnowflakeDataTypeConvertor();
+
+    @Test
+    public void toSeaTunnelType() {
+
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE,
+                snowflakeDataTypeConvertor.toSeaTunnelType(
+                        SnowflakeType.TEXT.name(), Collections.emptyMap()));
+
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE,
+                snowflakeDataTypeConvertor.toSeaTunnelType(
+                        SnowflakeType.VARIANT.name(), Collections.emptyMap()));
+
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE,
+                snowflakeDataTypeConvertor.toSeaTunnelType(
+                        SnowflakeType.OBJECT.name(), Collections.emptyMap()));
+    }
+}
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 7239917bc..b098633c4 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -97,14 +97,14 @@
                 <tablestore.version>5.13.9</tablestore.version>
                 <teradata.version>17.20.00.12</teradata.version>
                 <redshift.version>2.1.0.9</redshift.version>
+                <snowflake.version>3.13.29</snowflake.version>
 
-                <!-- Imap storage dependency package  -->
+              <!-- Imap storage dependency package  -->
                 <hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
                 <json-smart.version>2.4.7</json-smart.version>
                 <hadoop-aws.version>3.1.4</hadoop-aws.version>
                 <aws-java-sdk.version>1.11.271</aws-java-sdk.version>
                 <netty-buffer.version>4.1.89.Final</netty-buffer.version>
-
             </properties>
             <dependencies>
                 <!-- starters -->
@@ -584,6 +584,13 @@
                     <version>${redshift.version}</version>
                     <scope>provided</scope>
                 </dependency>
+
+                <dependency>
+                    <groupId>net.snowflake</groupId>
+                    <artifactId>snowflake-jdbc</artifactId>
+                    <version>${snowflake.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <!-- jdbc driver end -->
 
                 <dependency>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 08d3d759a..6daa5564f 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -201,6 +201,7 @@
                 <include>com.aliyun.openservices:tablestore-jdbc:jar</include>
                 <include>com.teradata.jdbc:terajdbc4:jar</include>
                 <include>com.amazon.redshift:redshift-jdbc42:jar</include>
+                <include>net.snowflake.snowflake-jdbc:jar</include>
             </includes>
             <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
             <outputDirectory>/plugins/jdbc/lib</outputDirectory>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
index 8c203354b..81ecdc298 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
@@ -41,6 +41,11 @@
             <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>net.snowflake</groupId>
+            <artifactId>snowflake-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>mssqlserver</artifactId>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSnowflakeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSnowflakeIT.java
new file mode 100644
index 000000000..9dbe57f72
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSnowflakeIT.java
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+
+import java.sql.Connection;
+
+@Disabled("Disabled because it needs user's personal snowflake account to run this test!")
+public class JdbcSnowflakeIT extends TestSuiteBase implements TestResource {
+    private static final String URL = "jdbc:snowflake://<account_name>.snowflakecomputing.com";
+    private static final String USERNAME = "user";
+    private static final String PASSWORD = "password";
+    private static final String SNOWFLAKE_DRIVER_JAR =
+            "https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.29/snowflake-jdbc-3.13.29.jar";
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                container.execInContainer(
+                        "bash",
+                        "-c",
+                        "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                + SNOWFLAKE_DRIVER_JAR);
+            };
+
+    private Connection connection;
+
+    @TestTemplate
+    public void testSnowflake(TestContainer container) throws Exception {
+        container.executeExtraCommands(extendedFactory);
+        Container.ExecResult execResult =
+                container.executeJob("/jdbc_snowflake_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        SnowflakeBasicDataSource dataSource = new SnowflakeBasicDataSource();
+        dataSource.setUrl(URL);
+        dataSource.setUser(USERNAME);
+        dataSource.setPassword(PASSWORD);
+        this.connection = dataSource.getConnection();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            this.connection.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_snowflake_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_snowflake_source_and_sink.conf
new file mode 100644
index 000000000..d903cad25
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_snowflake_source_and_sink.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc {
+        url = "jdbc:snowflake://<account_id>.aws.snowflakecomputing.com"
+        driver = "net.snowflake.client.jdbc.SnowflakeDriver"
+        user = "user"
+        password = "password"
+        query = """
+        SELECT
+          ID,
+          NUM,
+          DEC,
+          INT,
+          BIGINT,
+          SMALLINT,
+          TINYINT,
+          BYTEINT,
+          FLOAT,
+          DOUBLE,
+          VARCHAR_COL,
+          CHAR_COL,
+          STRING_COL,
+          BOOLEAN_COL,
+          DATE_COL,
+          TIME_COL,
+          TIMESTAMP_COL,
+          TIMESTAMP_NTZ_COL,
+          TIMESTAMP_LTZ_COL,
+          TIMESTAMP_TZ_COL,
+          VARIANT_COL,
+          OBJECT_COL,
+          GEOGRAPHY_COL,
+          GEOMETRY_COL,
+          BINARY_COL,
+          VARBINARY_COL
+        FROM TEST_INPUT_DB.TEST_INPUT_SCHEMA.MOCK_DATA;
+        """
+    }
+}
+
+transform {
+}
+
+sink {
+  jdbc {
+          url = "jdbc:snowflake://<account_id>.snowflakecomputing.com"
+          driver = "net.snowflake.client.jdbc.SnowflakeDriver"
+          user = "user"
+          password = "password"
+          query = """
+          INSERT INTO TEST_INPUT_DB.TEST_INPUT_SCHEMA.MOCK_DATA (id, num, dec, int, bigint, smallint, tinyint, byteint, float, double, varchar_col, char_col, string_col, boolean_col, date_col, time_col, timestamp_col, timestamp_ntz_col, timestamp_ltz_col, timestamp_tz_col, variant_col, object_col, geography_col, geometry_col, binary_col, varbinary_col)
+          values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
+          """
+      }
+}