You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/14 02:39:31 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add influxDB connector source (#2697)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1d70ea308 [Feature][Connector-V2] Add influxDB connector source (#2697)
1d70ea308 is described below

commit 1d70ea30847abd36c22f602e70c95fae2c4f7ab0
Author: Bibo <33...@users.noreply.github.com>
AuthorDate: Fri Oct 14 10:39:25 2022 +0800

    [Feature][Connector-V2] Add influxDB connector source (#2697)
    
    * [Feature][Connector-V2] Add influxDB connector source
---
 docs/en/connector-v2/source/InfluxDB.md            | 170 ++++++++++++++++
 plugin-mapping.properties                          |   1 +
 seatunnel-connectors-v2/connector-influxdb/pom.xml |  82 ++++++++
 .../seatunnel/influxdb/client/InfluxDBClient.java  |  78 ++++++++
 .../seatunnel/influxdb/config/InfluxDBConfig.java  | 114 +++++++++++
 .../influxdb/converter/InfluxDBRowConverter.java   |  66 +++++++
 .../seatunnel/influxdb/source/InfluxDBSource.java  | 124 ++++++++++++
 .../influxdb/source/InfluxDBSourceSplit.java       |  40 ++++
 .../source/InfluxDBSourceSplitEnumerator.java      | 215 +++++++++++++++++++++
 .../influxdb/source/InfluxdbSourceReader.java      | 147 ++++++++++++++
 .../influxdb/state/InfluxDBSourceState.java        |  35 ++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 .../connector-influxdb-flink-e2e}/pom.xml          |  33 ++--
 .../v2/influxdb/InfluxDBSourceToAssertIT.java      | 115 +++++++++++
 .../influxdb/influxdb_source_to_assert.conf        | 180 +++++++++++++++++
 .../src/test/resources/log4j.properties            |  22 +++
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 +
 .../connector-influxdb-spark-e2e}/pom.xml          |  41 ++--
 .../v2/influxdb/InfluxDBSourceToAssertIT.java      | 115 +++++++++++
 .../influxdb/influxdb_source_to_assert.conf        | 182 +++++++++++++++++
 .../src/test/resources/log4j.properties            |  22 +++
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 +
 22 files changed, 1746 insertions(+), 39 deletions(-)

diff --git a/docs/en/connector-v2/source/InfluxDB.md b/docs/en/connector-v2/source/InfluxDB.md
new file mode 100644
index 000000000..ab3731a55
--- /dev/null
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -0,0 +1,170 @@
+# InfluxDB
+
+> InfluxDB source connector
+
+## Description
+
+Read external data source data through InfluxDB.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name               | type   | required | default value |
+|--------------------|--------|----------|---------------|
+| url                | string | yes      | -             |
+| sql                | string | yes      | -             |
+| fields             | config | yes      | -             |
+| database           | string | yes      |               |
+| username           | string | no       | -             |
+| password           | string | no       | -             |
+| lower_bound        | long   | no       | -             |
+| upper_bound        | long   | no       | -             |
+| partition_num      | int    | no       | -             |
+| split_column       | string | no       | -             |
+| epoch              | string | no       | n             |
+| connect_timeout_ms | long   | no       | 15000         |
+| query_timeout_sec  | int    | no       | 3             |
+
+### url
+the url to connect to influxDB e.g.
+``` 
+http://influxdb-host:8086
+```
+
+### sql [string]
+The query sql used to search data
+
+```
+select name,age from test
+```
+
+### fields [string]
+
+the fields of the InfluxDB when you select
+
+the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+fields{
+    name=STRING
+    age=INT
+    }
+```
+
+### database [string]
+
+The `influxDB` database
+
+### username [string]
+
+the username of the influxDB when you select
+
+### password [string]
+
+the password of the influxDB when you select
+
+### split_column [string]
+
+the `split_column` of the influxDB when you select
+
+> Tips:
+> - influxDB tags is not supported as a segmented primary key because the type of tags can only be a string
+> - influxDB time is not supported as a segmented primary key because the time field cannot participate in mathematical calculation
+> - Currently, `split_column` only supports integer data segmentation, and does not support `float`, `string`, `date` and other types.
+
+### upper_bound [long]
+
+upper bound of the `split_column`column
+
+### lower_bound [long]
+
+lower bound of the `split_column` column
+
+```
+     split the $split_column range into $partition_num parts
+     if partition_num is 1, use the whole `split_column` range
+     if partition_num < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
+     
+     eg: lower_bound = 1, upper_bound = 10, partition_num = 2
+     sql = "select * from test where age > 0 and age < 10"
+     
+     split result
+
+     split 1: select * from test where ($split_column >= 1 and $split_column < 6)  and (  age > 0 and age < 10 )
+     
+     split 2: select * from test where ($split_column >= 6 and $split_column < 11) and (  age > 0 and age < 10 )
+
+```
+
+### partition_num [int]
+
+the `partition_num` of the InfluxDB when you select
+> Tips: Ensure that `upper_bound` minus `lower_bound` is divided `bypartition_num`, otherwise the query results will overlap
+
+### epoch [string]
+returned time precision
+- Optional values: H, m, s, MS, u, n
+- default value: n
+
+### query_timeout_sec [int]
+the `query_timeout` of the InfluxDB when you select, in seconds
+
+### connect_timeout_ms [long]
+the timeout for connecting to InfluxDB, in milliseconds 
+
+## Examples
+Example of multi parallelism and multi partition scanning 
+```hocon
+source {
+
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, value, rt, time from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "value"
+        fields {
+            label = STRING
+            value = INT
+            rt = STRING
+            time = BIGINT
+            }
+    }
+
+}
+
+```
+Example of not using partition scan 
+```hocon
+source {
+
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, value, rt, time from test"
+        database = "test"
+        fields {
+            label = STRING
+            value = INT
+            rt = STRING
+            time = BIGINT
+            }
+    }
+
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index bf4b0ce5f..0b786891e 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -130,3 +130,4 @@ seatunnel.sink.Sentry = connector-sentry
 seatunnel.source.MongoDB = connector-mongodb
 seatunnel.sink.MongoDB = connector-mongodb
 seatunnel.source.Iceberg = connector-iceberg
+seatunnel.source.influxdb = connector-influxdb
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-influxdb/pom.xml b/seatunnel-connectors-v2/connector-influxdb/pom.xml
new file mode 100644
index 000000000..0ed188cc1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-influxdb</artifactId>
+
+    <properties>
+        <influxdb.version>2.21</influxdb.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.influxdb</groupId>
+            <artifactId>influxdb-java</artifactId>
+            <version>${influxdb.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>okio</pattern>
+                                    <shadedPattern>shaded.okio</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>okhttp3</pattern>
+                                    <shadedPattern>shaded.okhttp3</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <shadeSourcesContent>false</shadeSourcesContent>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
new file mode 100644
index 000000000..8743d5aa2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.client;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.HttpUrl;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.impl.InfluxDBImpl;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class InfluxDBClient {
+    public static InfluxDB getInfluxDB(InfluxDBConfig config) throws ConnectException {
+        OkHttpClient.Builder clientBuilder =
+                new OkHttpClient.Builder()
+                        .connectTimeout(config.getConnectTimeOut(), TimeUnit.MILLISECONDS)
+                        .readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);
+        InfluxDB.ResponseFormat format = InfluxDB.ResponseFormat.valueOf(config.getFormat());
+        clientBuilder.addInterceptor(
+                new Interceptor() {
+                    @Override
+                    public Response intercept(Chain chain) throws IOException {
+                        Request request = chain.request();
+                        HttpUrl httpUrl =
+                                request.url()
+                                        .newBuilder()
+                                        //set epoch
+                                        .addQueryParameter("epoch", config.getEpoch())
+                                        .build();
+                        Request build = request.newBuilder().url(httpUrl).build();
+                        Response response = chain.proceed(build);
+                        return response;
+                    }
+                });
+        InfluxDB influxDB =
+                new InfluxDBImpl(
+                        config.getUrl(),
+                        StringUtils.isEmpty(config.getUsername()) ? StringUtils.EMPTY : config.getUsername(),
+                        StringUtils.isEmpty(config.getPassword()) ? StringUtils.EMPTY : config.getPassword(),
+                        clientBuilder,
+                        format);
+        String version = influxDB.version();
+        if (!influxDB.ping().isGood()) {
+            String errorMessage =
+                    String.format(
+                            "connect influxdb failed, the url is: {%s}",
+                            config.getUrl());
+            throw new ConnectException(errorMessage);
+        }
+        log.info("connect influxdb successful. sever version :{}.", version);
+        return influxDB;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
new file mode 100644
index 000000000..9a04e7d4a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class InfluxDBConfig implements Serializable {
+
+    public static final String USERNAME = "username";
+    public static final String PASSWORD = "password";
+    public static final String URL = "url";
+    private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
+    private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";
+
+    public static final String SQL = "sql";
+    public static final String SQL_WHERE = "where";
+
+    public static final String DATABASES = "database";
+    public static final String SPLIT_COLUMN = "split_column";
+    private static final String PARTITION_NUM = "partition_num";
+    private static final String UPPER_BOUND = "upper_bound";
+    private static final String LOWER_BOUND = "lower_bound";
+
+
+    private static final String DEFAULT_FORMAT = "MSGPACK";
+    private static final String EPOCH = "epoch";
+
+    public static final String DEFAULT_PARTITIONS = "0";
+    private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
+    private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;
+
+    private static final String DEFAULT_EPOCH = "n";
+
+    private String url;
+    private String username;
+    private String password;
+    private String sql;
+    private int partitionNum = 0;
+    private String splitKey;
+    private long lowerBound;
+    private long upperBound;
+    private String database;
+
+    private String format = DEFAULT_FORMAT;
+    private int queryTimeOut = DEFAULT_QUERY_TIMEOUT_SEC;
+    private long connectTimeOut = DEFAULT_CONNECT_TIMEOUT_MS;
+
+    private String epoch = DEFAULT_EPOCH;
+
+    List<Integer> columnsIndex;
+
+    public InfluxDBConfig(Config config) {
+        this.url = config.getString(URL);
+        this.sql = config.getString(SQL);
+
+        if (config.hasPath(USERNAME)) {
+            this.username = config.getString(USERNAME);
+        }
+        if (config.hasPath(PASSWORD)) {
+            this.password = config.getString(PASSWORD);
+        }
+        if (config.hasPath(PARTITION_NUM)) {
+            this.partitionNum = config.getInt(PARTITION_NUM);
+        }
+        if (config.hasPath(UPPER_BOUND)) {
+            this.upperBound = config.getInt(UPPER_BOUND);
+        }
+        if (config.hasPath(LOWER_BOUND)) {
+            this.lowerBound = config.getInt(LOWER_BOUND);
+        }
+        if (config.hasPath(SPLIT_COLUMN)) {
+            this.splitKey = config.getString(SPLIT_COLUMN);
+        }
+        if (config.hasPath(DATABASES)) {
+            this.database = config.getString(DATABASES);
+        }
+        if (config.hasPath(EPOCH)) {
+            this.epoch = config.getString(EPOCH);
+        }
+        if (config.hasPath(CONNECT_TIMEOUT_MS)) {
+            this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS);
+        }
+        if (config.hasPath(QUERY_TIMEOUT_SEC)) {
+            this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC);
+        }
+    }
+
+    @VisibleForTesting
+    public InfluxDBConfig(String url) {
+        this.url = url;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
new file mode 100644
index 000000000..405ab2d43
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InfluxDBRowConverter {
+
+    public static SeaTunnelRow convert(List<Object> values, SeaTunnelRowType typeInfo, List<Integer> indexList) {
+
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+        List<Object> fields = new ArrayList<>(seaTunnelDataTypes.length);
+
+        for (int i = 0; i <= seaTunnelDataTypes.length - 1; i++) {
+            Object seaTunnelField;
+            int columnIndex = indexList.get(i);
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+            SqlType fieldSqlType = seaTunnelDataType.getSqlType();
+            if (null == values.get(columnIndex)) {
+                seaTunnelField = null;
+            }
+            else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
+                seaTunnelField = Boolean.parseBoolean(values.get(columnIndex).toString());
+            } else if (SqlType.SMALLINT.equals(fieldSqlType)) {
+                seaTunnelField = Short.valueOf(values.get(columnIndex).toString());
+            } else if (SqlType.INT.equals(fieldSqlType)) {
+                seaTunnelField = Integer.valueOf(values.get(columnIndex).toString());
+            } else if (SqlType.BIGINT.equals(fieldSqlType)) {
+                seaTunnelField = Long.valueOf(values.get(columnIndex).toString());
+            } else if (SqlType.FLOAT.equals(fieldSqlType)) {
+                seaTunnelField = ((Double) values.get(columnIndex)).floatValue();
+            } else if (SqlType.DOUBLE.equals(fieldSqlType)) {
+                seaTunnelField = values.get(columnIndex);
+            } else if (SqlType.STRING.equals(fieldSqlType)) {
+                seaTunnelField = values.get(columnIndex);
+            } else {
+                throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+            }
+
+            fields.add(seaTunnelField);
+        }
+
+        return new SeaTunnelRow(fields.toArray());
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
new file mode 100644
index 000000000..bc971476b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit, InfluxDBSourceState>  {
+    private SeaTunnelRowType typeInfo;
+    private InfluxDBConfig influxDBConfig;
+
+    private List<Integer> columnsIndexList;
+
+    private static final String QUERY_LIMIT = " limit 1";
+
+    @Override
+    public String getPluginName() {
+        return "InfluxDB";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+        }
+        try {
+            this.influxDBConfig = new InfluxDBConfig(config);
+            SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config);
+            this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
+            this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(influxDBConfig));
+        } catch (Exception e) {
+            throw new PrepareFailException("InfluxDB", PluginType.SOURCE, e.toString());
+        }
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType getProducedType() {
+        return typeInfo;
+    }
+
+    @Override
+    public SourceReader createReader(SourceReader.Context readerContext) throws Exception {
+        return new InfluxdbSourceReader(influxDBConfig, readerContext, typeInfo, columnsIndexList);
+    }
+
+    @Override
+    public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception {
+        return new InfluxDBSourceSplitEnumerator(enumeratorContext, influxDBConfig);
+    }
+
+    @Override
+    public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> restoreEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> enumeratorContext, InfluxDBSourceState checkpointState) throws Exception {
+        return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, influxDBConfig);
+    }
+
+    private List<Integer> initColumnsIndex(InfluxDB influxDB)  {
+        //query one row to get column info
+        String query = influxDBConfig.getSql() + QUERY_LIMIT;
+        List<String> fieldNames = new ArrayList<>();
+        try {
+            QueryResult queryResult = influxDB.query(
+                    new Query(query, influxDBConfig.getDatabase()));
+
+            List<QueryResult.Series> serieList = queryResult.getResults().get(0).getSeries();
+            fieldNames.addAll(serieList.get(0).getColumns());
+
+            return Arrays.stream(typeInfo.getFieldNames()).map(x -> fieldNames.indexOf(x)).collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new RuntimeException("get column index of query result exception", e);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
new file mode 100644
index 000000000..37dd688f8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class InfluxDBSourceSplit implements SourceSplit {
+    private String splitId;
+
+    private String query;
+
+    public InfluxDBSourceSplit(String splitId, String query) {
+        this.query = query;
+        this.splitId = splitId;
+    }
+
+    @Override
+    public String splitId() {
+        return splitId;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
new file mode 100644
index 000000000..d22eba116
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+    private final Context<InfluxDBSourceSplit> context;
+    private final Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this(context, null, config);
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+        this.pendingSplit = new HashMap<>();
+        this.shouldEnumerate = sourceState == null;
+        if (sourceState != null) {
+            this.shouldEnumerate = sourceState.isShouldEnumerate();
+            this.pendingSplit.putAll(sourceState.getPendingSplit());
+        }
+    }
+
+    @Override
+    public void run() {
+        Set<Integer> readers = context.registeredReaders();
+        if (shouldEnumerate) {
+            Set<InfluxDBSourceSplit> newSplits = getInfluxDBSplit();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
+
+            assignSplit(readers);
+        }
+
+        log.debug("No more splits to assign." +
+                " Sending NoMoreSplitsEvent to reader {}.", readers);
+        readers.forEach(context::signalNoMoreSplits);
+    }
+
+    @Override
+    public void addSplitsBack(List splits, int subtaskId) {
+        log.debug("Add back splits {} to InfluxDBSourceSplitEnumerator.",
+                splits);
+        if (!splits.isEmpty()) {
+            addPendingSplit(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        log.debug("Register reader {} to InfluxDBSourceSplitEnumerator.",
+                subtaskId);
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public InfluxDBSourceState snapshotState(long checkpointId) {
+        synchronized (stateLock) {
+            return new InfluxDBSourceState(shouldEnumerate, pendingSplit);
+        }
+    }
+
+    private Set<InfluxDBSourceSplit> getInfluxDBSplit() {
+        String sql = config.getSql();
+        Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
+        // no need numPartitions, use one partition
+        if (config.getPartitionNum() == 0) {
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql));
+            return influxDBSourceSplits;
+        }
+        //calculate numRange base on (lowerBound upperBound partitionNum)
+        List<Pair<Long, Long>> rangePairs = genSplitNumRange(config.getLowerBound(), config.getUpperBound(), config.getPartitionNum());
+
+        String[] sqls = sql.split(SQL_WHERE);
+        if (sqls.length > 2) {
+            throw new IllegalArgumentException("sql should not contain more than one where");
+        }
+
+        int i = 0;
+        while (i < rangePairs.size()) {
+            String query = " where (" + config.getSplitKey() + " >= " + rangePairs.get(i).getLeft() + " and " + config.getSplitKey()  + " < " + rangePairs.get(i).getRight() + ") ";
+            i++;
+            query = sqls[0] + query;
+            if (sqls.length > 1) {
+                query = query + " and ( " + sqls[1] + " ) ";
+            }
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(String.valueOf(i + System.nanoTime()), query));
+        }
+        return influxDBSourceSplits;
+    }
+
+    public static List<Pair<Long, Long>> genSplitNumRange(long lowerBound, long upperBound, int splitNum) {
+        List<Pair<Long, Long>> rangeList = new ArrayList<>();
+        int numPartitions = splitNum;
+        int size = (int) (upperBound - lowerBound) / numPartitions + 1;
+        int remainder = (int) ((upperBound + 1 - lowerBound) % numPartitions);
+        if (upperBound - lowerBound < numPartitions) {
+            numPartitions = (int) (upperBound - lowerBound);
+        }
+        long currentStart = lowerBound;
+        int i = 0;
+        while (i < numPartitions) {
+            rangeList.add(Pair.of(currentStart, currentStart + size));
+            i++;
+            currentStart += size;
+            if (i + 1 <= numPartitions) {
+                currentStart = currentStart - remainder;
+            }
+        }
+        return rangeList;
+    }
+
+    private void addPendingSplit(Collection<InfluxDBSourceSplit> splits) {
+        int readerCount = context.currentParallelism();
+        for (InfluxDBSourceSplit split : splits) {
+            int ownerReader = getSplitOwner(split.splitId(), readerCount);
+            log.info("Assigning {} to {} reader.", split, ownerReader);
+            pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>())
+                    .add(split);
+        }
+    }
+
+    private void assignSplit(Collection<Integer> readers) {
+        log.debug("Assign pendingSplits to readers {}", readers);
+
+        for (int reader : readers) {
+            List<InfluxDBSourceSplit> assignmentForReader = pendingSplit.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) {
+                log.info("Assign splits {} to reader {}",
+                        assignmentForReader, reader);
+                try {
+                    context.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    log.error("Failed to assign splits {} to reader {}",
+                            assignmentForReader, reader, e);
+                    pendingSplit.put(reader, assignmentForReader);
+                }
+            }
+        }
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    @Override
+    public void open() {
+        //nothing to do
+    }
+
+    @Override
+    public void close() {
+        //nothing to do
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        //nothing to do
+
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId);
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
new file mode 100644
index 000000000..090b8bf97
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBRowConverter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+@Slf4j
+public class InfluxdbSourceReader implements SourceReader<SeaTunnelRow, InfluxDBSourceSplit> {
+    private InfluxDB influxDB;
+    InfluxDBConfig config;
+
+    private final SourceReader.Context context;
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    List<Integer> columnsIndexList;
+    private final Queue<InfluxDBSourceSplit> pendingSplits;
+
+    private volatile boolean noMoreSplitsAssignment;
+
+    InfluxdbSourceReader(InfluxDBConfig config, Context readerContext, SeaTunnelRowType seaTunnelRowType, List<Integer> columnsIndexList) {
+        this.config = config;
+        this.pendingSplits = new LinkedList<>();
+        this.context = readerContext;
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.columnsIndexList = columnsIndexList;
+    }
+
+    public void connect() throws ConnectException {
+        if (influxDB == null) {
+            influxDB = InfluxDBClient.getInfluxDB(config);
+            String version = influxDB.version();
+            if (!influxDB.ping().isGood()) {
+                String errorMessage =
+                        String.format(
+                                "connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}",
+                                config.getUrl());
+                throw new ConnectException(errorMessage);
+            }
+            log.info("connect influxdb successful. sever version :{}.", version);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        connect();
+    }
+
+    @Override
+    public void close() {
+        if (influxDB != null) {
+            influxDB.close();
+            influxDB = null;
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) {
+        while (!pendingSplits.isEmpty()) {
+            synchronized (output.getCheckpointLock()) {
+                InfluxDBSourceSplit split = pendingSplits.poll();
+                read(split, output);
+            }
+        }
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())
+                && noMoreSplitsAssignment
+                && pendingSplits.isEmpty()) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded influxDB source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public List<InfluxDBSourceSplit> snapshotState(long checkpointId) {
+        return new ArrayList<>(pendingSplits);
+    }
+
+    @Override
+    public void addSplits(List<InfluxDBSourceSplit> splits) {
+        pendingSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        log.info("Reader received NoMoreSplits event.");
+        noMoreSplitsAssignment = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId)  {
+
+    }
+
+    private void read(InfluxDBSourceSplit split, Collector<SeaTunnelRow> output) {
+        QueryResult queryResult = influxDB.query(new Query(split.getQuery(), config.getDatabase()));
+        for (QueryResult.Result result : queryResult.getResults()) {
+            List<QueryResult.Series> serieList = result.getSeries();
+            if (CollectionUtils.isNotEmpty(serieList)) {
+                for (QueryResult.Series series : serieList) {
+                    for (List<Object> values : series.getValues()) {
+                        SeaTunnelRow row = InfluxDBRowConverter.convert(values, seaTunnelRowType, columnsIndexList);
+                        output.collect(row);
+                    }
+                }
+            } else {
+                log.debug(
+                        "split[{}] reader influxDB series is empty.", split.splitId());
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/state/InfluxDBSourceState.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/state/InfluxDBSourceState.java
new file mode 100644
index 000000000..3d9852bba
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/state/InfluxDBSourceState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.state;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@AllArgsConstructor
+@Getter
+public class InfluxDBSourceState implements Serializable {
+
+    private boolean shouldEnumerate;
+    private Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index df037152e..37dd5223c 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -55,6 +55,7 @@
         <module>connector-sentry</module>
         <module>connector-mongodb</module>
         <module>connector-iceberg</module>
+        <module>connector-influxdb</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml
similarity index 62%
copy from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml
index 08bfa545f..83353aa6c 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml
@@ -13,36 +13,31 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
-
-    <modules>
-        <module>connector-spark-e2e-base</module>
-        <module>connector-datahub-spark-e2e</module>
-        <module>connector-fake-spark-e2e</module>
-        <module>connector-file-spark-e2e</module>
-        <module>connector-iotdb-spark-e2e</module>
-        <module>connector-jdbc-spark-e2e</module>
-        <module>connector-mongodb-spark-e2e</module>
-    </modules>
+    <artifactId>connector-influxdb-flink-e2e</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-starter</artifactId>
+            <artifactId>connector-flink-e2e-base</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-influxdb</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java
new file mode 100644
index 000000000..336d1743d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.influxdb;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class InfluxDBSourceToAssertIT extends FlinkContainer {
+
+    private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8";
+    private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host";
+    private static final String INFLUXDB_HOST = "localhost";
+
+    private static final int INFLUXDB_PORT = 8764;
+    private static final int INFLUXDB_CONTAINER_PORT = 8086;
+    private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT);
+    private static final String INFLUXDB_DATABASE = "test";
+    private static final String INFLUXDB_MEASUREMENT = "test";
+
+    private GenericContainer<?> influxDBServer;
+
+    private  InfluxDB influxDB;
+
+    @BeforeEach
+    public void startInfluxDBContainer() throws ClassNotFoundException, SQLException, ConnectException {
+        influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(INFLUXDB_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        influxDBServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", INFLUXDB_PORT, INFLUXDB_CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(influxDBServer)).join();
+        log.info("influxdb container started");
+        initializeInfluxDBClient();
+        batchInsertData();
+    }
+
+    @Test
+    public void testInfluxDBSource() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/influxdb/influxdb_source_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    private void initializeInfluxDBClient() throws SQLException, ClassNotFoundException, ConnectException {
+        InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL);
+        influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+    }
+
+    public void batchInsertData() {
+        influxDB.createDatabase(INFLUXDB_DATABASE);
+        BatchPoints batchPoints = BatchPoints
+                .database(INFLUXDB_DATABASE)
+                .build();
+        for (int i = 0; i < 100; i++) {
+            Point point = Point.measurement(INFLUXDB_MEASUREMENT)
+                    .time(new Date().getTime(), TimeUnit.NANOSECONDS)
+                    .tag("label", String.format("label_%s", i))
+                    .addField("f1", String.format("f1_%s", i))
+                    .addField("f2", Double.valueOf(i + 1))
+                    .addField("f3", Long.valueOf(i + 2))
+                    .addField("f4", Float.valueOf(i + 3))
+                    .addField("f5", Integer.valueOf(i))
+                    .addField("f6", (short) (i + 4))
+                    .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE)
+                    .build();
+            batchPoints.point(point);
+        }
+        influxDB.write(batchPoints);
+    }
+
+    @AfterEach
+    public void closeInfluxDBContainer() {
+        if (influxDBServer != null) {
+            influxDBServer.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
new file mode 100644
index 000000000..eb2a1e6c5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "f5"
+        fields {
+            label = STRING
+            f1 = STRING
+            f2 = DOUBLE
+            f3 = BIGINT
+            f4 = FLOAT
+            f5 = INT
+            f6 = SMALLINT
+            f7 = BOOLEAN
+            }
+    }
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+     Assert {
+         rules =
+          {
+           field_rules =  [{
+                 field_name = f1
+                 field_type = string
+                 field_value = [
+                     {
+                         rule_type = NOT_NULL
+                     },
+                     {
+                         rule_type = MIN_LENGTH
+                         rule_value = 4
+                     },
+                     {
+                          rule_type = MAX_LENGTH
+                          rule_value = 5
+                     }
+                 ]
+             },{
+                 field_name = f2
+                 field_type = double
+                 field_value = [
+                     {
+                         rule_type = NOT_NULL
+                     },
+                     {
+                         rule_type = MIN
+                         rule_value = 1
+                     },
+                     {
+                          rule_type = MAX
+                          rule_value = 100
+                     }
+                 ]
+             },{
+                 field_name = f3
+                 field_type = long
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 2
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 101
+                       }
+                 ]
+             },{
+                 field_name = f4
+                 field_type = float
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 3
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 102
+                       }
+                 ]
+             },{
+                 field_name = f5
+                 field_type = int
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 1
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 99
+                       }
+                 ]
+             },{
+                 field_name = f6
+                 field_type = short
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 4
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 103
+                       }
+                 ]
+             },{
+                 field_name = f7
+                 field_type = boolean
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 0
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 1
+                       }
+                 ]
+             }
+             ]
+        }
+     }
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index b492ce56c..95d24c889 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -36,6 +36,7 @@
         <module>connector-fake-flink-e2e</module>
         <module>connector-mongodb-flink-e2e</module>
         <module>connector-iceberg-flink-e2e</module>
+        <module>connector-influxdb-flink-e2e</module>
     </modules>
 
     <dependencies>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml
similarity index 59%
copy from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml
index b492ce56c..11fd1b931 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml
@@ -13,37 +13,38 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
-
-    <modules>
-        <module>connector-flink-e2e-base</module>
-        <module>connector-file-flink-e2e</module>
-        <module>connector-jdbc-flink-e2e</module>
-        <module>connector-iotdb-flink-e2e</module>
-        <module>connector-datahub-flink-e2e</module>
-        <module>connector-assert-flink-e2e</module>
-        <module>connector-fake-flink-e2e</module>
-        <module>connector-mongodb-flink-e2e</module>
-        <module>connector-iceberg-flink-e2e</module>
-    </modules>
+    <artifactId>connector-influxdb-spark-e2e</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
+            <artifactId>connector-spark-e2e-base</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-influxdb</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java
new file mode 100644
index 000000000..6283f3e82
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.influxdb;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class InfluxDBSourceToAssertIT extends SparkContainer {
+
+    private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8";
+    private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host";
+    private static final String INFLUXDB_HOST = "localhost";
+
+    private static final int INFLUXDB_PORT = 8764;
+    private static final int INFLUXDB_CONTAINER_PORT = 8086;
+    private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT);
+    private static final String INFLUXDB_DATABASE = "test";
+    private static final String INFLUXDB_MEASUREMENT = "test";
+
+    private GenericContainer<?> influxDBServer;
+
+    private  InfluxDB influxDB;
+
+    @BeforeEach
+    public void startInfluxDBContainer() throws ClassNotFoundException, SQLException, ConnectException {
+        influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(INFLUXDB_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        influxDBServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", INFLUXDB_PORT, INFLUXDB_CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(influxDBServer)).join();
+        log.info("influxdb container started");
+        initializeInfluxDBClient();
+        batchInsertData();
+    }
+
+    @Test
+    public void testInfluxDBSource() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/influxdb/influxdb_source_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    private void initializeInfluxDBClient() throws SQLException, ClassNotFoundException, ConnectException {
+        InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL);
+        influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+    }
+
+    public void batchInsertData() {
+        influxDB.createDatabase(INFLUXDB_DATABASE);
+        BatchPoints batchPoints = BatchPoints
+                .database(INFLUXDB_DATABASE)
+                .build();
+        for (int i = 0; i < 100; i++) {
+            Point point = Point.measurement(INFLUXDB_MEASUREMENT)
+                    .time(new Date().getTime(), TimeUnit.NANOSECONDS)
+                    .tag("label", String.format("label_%s", i))
+                    .addField("f1", String.format("f1_%s", i))
+                    .addField("f2", Double.valueOf(i + 1))
+                    .addField("f3", Long.valueOf(i + 2))
+                    .addField("f4", Float.valueOf(i + 3))
+                    .addField("f5", Integer.valueOf(i))
+                    .addField("f6", (short) (i + 4))
+                    .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE)
+                    .build();
+            batchPoints.point(point);
+        }
+        influxDB.write(batchPoints);
+    }
+
+    @AfterEach
+    public void closeInfluxDBContainer() {
+        if (influxDBServer != null) {
+            influxDBServer.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
new file mode 100644
index 000000000..f8901980d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "f5"
+        fields {
+            label = STRING
+            f1 = STRING
+            f2 = DOUBLE
+            f3 = BIGINT
+            f4 = FLOAT
+            f5 = INT
+            f6 = SMALLINT
+            f7 = BOOLEAN
+            }
+    }
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+     Assert {
+         rules =
+          {
+           field_rules =  [{
+                 field_name = f1
+                 field_type = string
+                 field_value = [
+                     {
+                         rule_type = NOT_NULL
+                     },
+                     {
+                         rule_type = MIN_LENGTH
+                         rule_value = 4
+                     },
+                     {
+                          rule_type = MAX_LENGTH
+                          rule_value = 5
+                     }
+                 ]
+             },{
+                 field_name = f2
+                 field_type = double
+                 field_value = [
+                     {
+                         rule_type = NOT_NULL
+                     },
+                     {
+                         rule_type = MIN
+                         rule_value = 1
+                     },
+                     {
+                          rule_type = MAX
+                          rule_value = 100
+                     }
+                 ]
+             },{
+                 field_name = f3
+                 field_type = long
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 2
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 101
+                       }
+                 ]
+             },{
+                 field_name = f4
+                 field_type = float
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 3
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 102
+                       }
+                 ]
+             },{
+                 field_name = f5
+                 field_type = int
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 1
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 99
+                       }
+                 ]
+             },{
+                 field_name = f6
+                 field_type = short
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 4
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 103
+                       }
+                 ]
+             },{
+                 field_name = f7
+                 field_type = boolean
+                 field_value = [
+                      {
+                           rule_type = NOT_NULL
+                      },
+                      {
+                           rule_type = MIN
+                           rule_value = 0
+                       },
+                       {
+                            rule_type = MAX
+                            rule_value = 1
+                       }
+                 ]
+             }
+             ]
+        }
+     }
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 08bfa545f..64fc92056 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
         <module>connector-iotdb-spark-e2e</module>
         <module>connector-jdbc-spark-e2e</module>
         <module>connector-mongodb-spark-e2e</module>
+        <module>connector-influxdb-spark-e2e</module>
     </modules>
 
     <dependencies>