You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/14 02:39:31 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add influxDB connector source (#2697)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1d70ea308 [Feature][Connector-V2] Add influxDB connector source (#2697)
1d70ea308 is described below
commit 1d70ea30847abd36c22f602e70c95fae2c4f7ab0
Author: Bibo <33...@users.noreply.github.com>
AuthorDate: Fri Oct 14 10:39:25 2022 +0800
[Feature][Connector-V2] Add influxDB connector source (#2697)
* [Feature][Connector-V2] Add influxDB connector source
---
docs/en/connector-v2/source/InfluxDB.md | 170 ++++++++++++++++
plugin-mapping.properties | 1 +
seatunnel-connectors-v2/connector-influxdb/pom.xml | 82 ++++++++
.../seatunnel/influxdb/client/InfluxDBClient.java | 78 ++++++++
.../seatunnel/influxdb/config/InfluxDBConfig.java | 114 +++++++++++
.../influxdb/converter/InfluxDBRowConverter.java | 66 +++++++
.../seatunnel/influxdb/source/InfluxDBSource.java | 124 ++++++++++++
.../influxdb/source/InfluxDBSourceSplit.java | 40 ++++
.../source/InfluxDBSourceSplitEnumerator.java | 215 +++++++++++++++++++++
.../influxdb/source/InfluxdbSourceReader.java | 147 ++++++++++++++
.../influxdb/state/InfluxDBSourceState.java | 35 ++++
seatunnel-connectors-v2/pom.xml | 1 +
.../connector-influxdb-flink-e2e}/pom.xml | 33 ++--
.../v2/influxdb/InfluxDBSourceToAssertIT.java | 115 +++++++++++
.../influxdb/influxdb_source_to_assert.conf | 180 +++++++++++++++++
.../src/test/resources/log4j.properties | 22 +++
.../seatunnel-flink-connector-v2-e2e/pom.xml | 1 +
.../connector-influxdb-spark-e2e}/pom.xml | 41 ++--
.../v2/influxdb/InfluxDBSourceToAssertIT.java | 115 +++++++++++
.../influxdb/influxdb_source_to_assert.conf | 182 +++++++++++++++++
.../src/test/resources/log4j.properties | 22 +++
.../seatunnel-spark-connector-v2-e2e/pom.xml | 1 +
22 files changed, 1746 insertions(+), 39 deletions(-)
diff --git a/docs/en/connector-v2/source/InfluxDB.md b/docs/en/connector-v2/source/InfluxDB.md
new file mode 100644
index 000000000..ab3731a55
--- /dev/null
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -0,0 +1,170 @@
+# InfluxDB
+
+> InfluxDB source connector
+
+## Description
+
+Read external data source data through InfluxDB.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name | type | required | default value |
+|--------------------|--------|----------|---------------|
+| url | string | yes | - |
+| sql | string | yes | - |
+| fields | config | yes | - |
+| database | string | yes | |
+| username | string | no | - |
+| password | string | no | - |
+| lower_bound | long | no | - |
+| upper_bound | long | no | - |
+| partition_num | int | no | - |
+| split_column | string | no | - |
+| epoch | string | no | n |
+| connect_timeout_ms | long | no | 15000 |
+| query_timeout_sec | int | no | 3 |
+
+### url
+the url to connect to influxDB e.g.
+```
+http://influxdb-host:8086
+```
+
+### sql [string]
+The query sql used to search data
+
+```
+select name,age from test
+```
+
+### fields [string]
+
+the fields of the InfluxDB when you select
+
+the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+fields{
+ name=STRING
+ age=INT
+ }
+```
+
+### database [string]
+
+The `influxDB` database
+
+### username [string]
+
+the username of the influxDB when you select
+
+### password [string]
+
+the password of the influxDB when you select
+
+### split_column [string]
+
+the `split_column` of the influxDB when you select
+
+> Tips:
+> - influxDB tags is not supported as a segmented primary key because the type of tags can only be a string
+> - influxDB time is not supported as a segmented primary key because the time field cannot participate in mathematical calculation
+> - Currently, `split_column` only supports integer data segmentation, and does not support `float`, `string`, `date` and other types.
+
+### upper_bound [long]
+
+upper bound of the `split_column`column
+
+### lower_bound [long]
+
+lower bound of the `split_column` column
+
+```
+ split the $split_column range into $partition_num parts
+ if partition_num is 1, use the whole `split_column` range
+ if partition_num < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
+
+ eg: lower_bound = 1, upper_bound = 10, partition_num = 2
+ sql = "select * from test where age > 0 and age < 10"
+
+ split result
+
+ split 1: select * from test where ($split_column >= 1 and $split_column < 6) and ( age > 0 and age < 10 )
+
+ split 2: select * from test where ($split_column >= 6 and $split_column < 11) and ( age > 0 and age < 10 )
+
+```
+
+### partition_num [int]
+
+the `partition_num` of the InfluxDB when you select
+> Tips: Ensure that `upper_bound` minus `lower_bound` is divided `bypartition_num`, otherwise the query results will overlap
+
+### epoch [string]
+returned time precision
+- Optional values: H, m, s, MS, u, n
+- default value: n
+
+### query_timeout_sec [int]
+the `query_timeout` of the InfluxDB when you select, in seconds
+
+### connect_timeout_ms [long]
+the timeout for connecting to InfluxDB, in milliseconds
+
+## Examples
+Example of multi parallelism and multi partition scanning
+```hocon
+source {
+
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ sql = "select label, value, rt, time from test"
+ database = "test"
+ upper_bound = 100
+ lower_bound = 1
+ partition_num = 4
+ split_column = "value"
+ fields {
+ label = STRING
+ value = INT
+ rt = STRING
+ time = BIGINT
+ }
+ }
+
+}
+
+```
+Example of not using partition scan
+```hocon
+source {
+
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ sql = "select label, value, rt, time from test"
+ database = "test"
+ fields {
+ label = STRING
+ value = INT
+ rt = STRING
+ time = BIGINT
+ }
+ }
+
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index bf4b0ce5f..0b786891e 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -130,3 +130,4 @@ seatunnel.sink.Sentry = connector-sentry
seatunnel.source.MongoDB = connector-mongodb
seatunnel.sink.MongoDB = connector-mongodb
seatunnel.source.Iceberg = connector-iceberg
+seatunnel.source.influxdb = connector-influxdb
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-influxdb/pom.xml b/seatunnel-connectors-v2/connector-influxdb/pom.xml
new file mode 100644
index 000000000..0ed188cc1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-influxdb</artifactId>
+
+ <properties>
+ <influxdb.version>2.21</influxdb.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.influxdb</groupId>
+ <artifactId>influxdb-java</artifactId>
+ <version>${influxdb.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>okio</pattern>
+ <shadedPattern>shaded.okio</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>okhttp3</pattern>
+ <shadedPattern>shaded.okhttp3</shadedPattern>
+ </relocation>
+ </relocations>
+ <shadeSourcesContent>false</shadeSourcesContent>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
new file mode 100644
index 000000000..8743d5aa2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.client;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.HttpUrl;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.impl.InfluxDBImpl;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class InfluxDBClient {
+ public static InfluxDB getInfluxDB(InfluxDBConfig config) throws ConnectException {
+ OkHttpClient.Builder clientBuilder =
+ new OkHttpClient.Builder()
+ .connectTimeout(config.getConnectTimeOut(), TimeUnit.MILLISECONDS)
+ .readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);
+ InfluxDB.ResponseFormat format = InfluxDB.ResponseFormat.valueOf(config.getFormat());
+ clientBuilder.addInterceptor(
+ new Interceptor() {
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ Request request = chain.request();
+ HttpUrl httpUrl =
+ request.url()
+ .newBuilder()
+ //set epoch
+ .addQueryParameter("epoch", config.getEpoch())
+ .build();
+ Request build = request.newBuilder().url(httpUrl).build();
+ Response response = chain.proceed(build);
+ return response;
+ }
+ });
+ InfluxDB influxDB =
+ new InfluxDBImpl(
+ config.getUrl(),
+ StringUtils.isEmpty(config.getUsername()) ? StringUtils.EMPTY : config.getUsername(),
+ StringUtils.isEmpty(config.getPassword()) ? StringUtils.EMPTY : config.getPassword(),
+ clientBuilder,
+ format);
+ String version = influxDB.version();
+ if (!influxDB.ping().isGood()) {
+ String errorMessage =
+ String.format(
+ "connect influxdb failed, the url is: {%s}",
+ config.getUrl());
+ throw new ConnectException(errorMessage);
+ }
+ log.info("connect influxdb successful. sever version :{}.", version);
+ return influxDB;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
new file mode 100644
index 000000000..9a04e7d4a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class InfluxDBConfig implements Serializable {
+
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String URL = "url";
+ private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms";
+ private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec";
+
+ public static final String SQL = "sql";
+ public static final String SQL_WHERE = "where";
+
+ public static final String DATABASES = "database";
+ public static final String SPLIT_COLUMN = "split_column";
+ private static final String PARTITION_NUM = "partition_num";
+ private static final String UPPER_BOUND = "upper_bound";
+ private static final String LOWER_BOUND = "lower_bound";
+
+
+ private static final String DEFAULT_FORMAT = "MSGPACK";
+ private static final String EPOCH = "epoch";
+
+ public static final String DEFAULT_PARTITIONS = "0";
+ private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3;
+ private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000;
+
+ private static final String DEFAULT_EPOCH = "n";
+
+ private String url;
+ private String username;
+ private String password;
+ private String sql;
+ private int partitionNum = 0;
+ private String splitKey;
+ private long lowerBound;
+ private long upperBound;
+ private String database;
+
+ private String format = DEFAULT_FORMAT;
+ private int queryTimeOut = DEFAULT_QUERY_TIMEOUT_SEC;
+ private long connectTimeOut = DEFAULT_CONNECT_TIMEOUT_MS;
+
+ private String epoch = DEFAULT_EPOCH;
+
+ List<Integer> columnsIndex;
+
+ public InfluxDBConfig(Config config) {
+ this.url = config.getString(URL);
+ this.sql = config.getString(SQL);
+
+ if (config.hasPath(USERNAME)) {
+ this.username = config.getString(USERNAME);
+ }
+ if (config.hasPath(PASSWORD)) {
+ this.password = config.getString(PASSWORD);
+ }
+ if (config.hasPath(PARTITION_NUM)) {
+ this.partitionNum = config.getInt(PARTITION_NUM);
+ }
+ if (config.hasPath(UPPER_BOUND)) {
+ this.upperBound = config.getInt(UPPER_BOUND);
+ }
+ if (config.hasPath(LOWER_BOUND)) {
+ this.lowerBound = config.getInt(LOWER_BOUND);
+ }
+ if (config.hasPath(SPLIT_COLUMN)) {
+ this.splitKey = config.getString(SPLIT_COLUMN);
+ }
+ if (config.hasPath(DATABASES)) {
+ this.database = config.getString(DATABASES);
+ }
+ if (config.hasPath(EPOCH)) {
+ this.epoch = config.getString(EPOCH);
+ }
+ if (config.hasPath(CONNECT_TIMEOUT_MS)) {
+ this.connectTimeOut = config.getLong(CONNECT_TIMEOUT_MS);
+ }
+ if (config.hasPath(QUERY_TIMEOUT_SEC)) {
+ this.queryTimeOut = config.getInt(QUERY_TIMEOUT_SEC);
+ }
+ }
+
+ @VisibleForTesting
+ public InfluxDBConfig(String url) {
+ this.url = url;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
new file mode 100644
index 000000000..405ab2d43
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InfluxDBRowConverter {
+
+ public static SeaTunnelRow convert(List<Object> values, SeaTunnelRowType typeInfo, List<Integer> indexList) {
+
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+ List<Object> fields = new ArrayList<>(seaTunnelDataTypes.length);
+
+ for (int i = 0; i <= seaTunnelDataTypes.length - 1; i++) {
+ Object seaTunnelField;
+ int columnIndex = indexList.get(i);
+ SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+ SqlType fieldSqlType = seaTunnelDataType.getSqlType();
+ if (null == values.get(columnIndex)) {
+ seaTunnelField = null;
+ }
+ else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
+ seaTunnelField = Boolean.parseBoolean(values.get(columnIndex).toString());
+ } else if (SqlType.SMALLINT.equals(fieldSqlType)) {
+ seaTunnelField = Short.valueOf(values.get(columnIndex).toString());
+ } else if (SqlType.INT.equals(fieldSqlType)) {
+ seaTunnelField = Integer.valueOf(values.get(columnIndex).toString());
+ } else if (SqlType.BIGINT.equals(fieldSqlType)) {
+ seaTunnelField = Long.valueOf(values.get(columnIndex).toString());
+ } else if (SqlType.FLOAT.equals(fieldSqlType)) {
+ seaTunnelField = ((Double) values.get(columnIndex)).floatValue();
+ } else if (SqlType.DOUBLE.equals(fieldSqlType)) {
+ seaTunnelField = values.get(columnIndex);
+ } else if (SqlType.STRING.equals(fieldSqlType)) {
+ seaTunnelField = values.get(columnIndex);
+ } else {
+ throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+ }
+
+ fields.add(seaTunnelField);
+ }
+
+ return new SeaTunnelRow(fields.toArray());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
new file mode 100644
index 000000000..bc971476b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit, InfluxDBSourceState> {
+ private SeaTunnelRowType typeInfo;
+ private InfluxDBConfig influxDBConfig;
+
+ private List<Integer> columnsIndexList;
+
+ private static final String QUERY_LIMIT = " limit 1";
+
+ @Override
+ public String getPluginName() {
+ return "InfluxDB";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+ }
+ try {
+ this.influxDBConfig = new InfluxDBConfig(config);
+ SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config);
+ this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
+ this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(influxDBConfig));
+ } catch (Exception e) {
+ throw new PrepareFailException("InfluxDB", PluginType.SOURCE, e.toString());
+ }
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType getProducedType() {
+ return typeInfo;
+ }
+
+ @Override
+ public SourceReader createReader(SourceReader.Context readerContext) throws Exception {
+ return new InfluxdbSourceReader(influxDBConfig, readerContext, typeInfo, columnsIndexList);
+ }
+
+ @Override
+ public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception {
+ return new InfluxDBSourceSplitEnumerator(enumeratorContext, influxDBConfig);
+ }
+
+ @Override
+ public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> restoreEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> enumeratorContext, InfluxDBSourceState checkpointState) throws Exception {
+ return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, influxDBConfig);
+ }
+
+ private List<Integer> initColumnsIndex(InfluxDB influxDB) {
+ //query one row to get column info
+ String query = influxDBConfig.getSql() + QUERY_LIMIT;
+ List<String> fieldNames = new ArrayList<>();
+ try {
+ QueryResult queryResult = influxDB.query(
+ new Query(query, influxDBConfig.getDatabase()));
+
+ List<QueryResult.Series> serieList = queryResult.getResults().get(0).getSeries();
+ fieldNames.addAll(serieList.get(0).getColumns());
+
+ return Arrays.stream(typeInfo.getFieldNames()).map(x -> fieldNames.indexOf(x)).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException("get column index of query result exception", e);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
new file mode 100644
index 000000000..37dd688f8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class InfluxDBSourceSplit implements SourceSplit {
+ private String splitId;
+
+ private String query;
+
+ public InfluxDBSourceSplit(String splitId, String query) {
+ this.query = query;
+ this.splitId = splitId;
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
new file mode 100644
index 000000000..d22eba116
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+ final InfluxDBConfig config;
+ private final Context<InfluxDBSourceSplit> context;
+ private final Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
+ private final Object stateLock = new Object();
+ private volatile boolean shouldEnumerate;
+
+ public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+ this(context, null, config);
+ }
+
+ public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+ this.context = context;
+ this.config = config;
+ this.pendingSplit = new HashMap<>();
+ this.shouldEnumerate = sourceState == null;
+ if (sourceState != null) {
+ this.shouldEnumerate = sourceState.isShouldEnumerate();
+ this.pendingSplit.putAll(sourceState.getPendingSplit());
+ }
+ }
+
+ @Override
+ public void run() {
+ Set<Integer> readers = context.registeredReaders();
+ if (shouldEnumerate) {
+ Set<InfluxDBSourceSplit> newSplits = getInfluxDBSplit();
+
+ synchronized (stateLock) {
+ addPendingSplit(newSplits);
+ shouldEnumerate = false;
+ }
+
+ assignSplit(readers);
+ }
+
+ log.debug("No more splits to assign." +
+ " Sending NoMoreSplitsEvent to reader {}.", readers);
+ readers.forEach(context::signalNoMoreSplits);
+ }
+
+ @Override
+ public void addSplitsBack(List splits, int subtaskId) {
+ log.debug("Add back splits {} to InfluxDBSourceSplitEnumerator.",
+ splits);
+ if (!splits.isEmpty()) {
+ addPendingSplit(splits);
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ log.debug("Register reader {} to InfluxDBSourceSplitEnumerator.",
+ subtaskId);
+ if (!pendingSplit.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ @Override
+ public InfluxDBSourceState snapshotState(long checkpointId) {
+ synchronized (stateLock) {
+ return new InfluxDBSourceState(shouldEnumerate, pendingSplit);
+ }
+ }
+
+ private Set<InfluxDBSourceSplit> getInfluxDBSplit() {
+ String sql = config.getSql();
+ Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
+ // no need numPartitions, use one partition
+ if (config.getPartitionNum() == 0) {
+ influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql));
+ return influxDBSourceSplits;
+ }
+ //calculate numRange base on (lowerBound upperBound partitionNum)
+ List<Pair<Long, Long>> rangePairs = genSplitNumRange(config.getLowerBound(), config.getUpperBound(), config.getPartitionNum());
+
+ String[] sqls = sql.split(SQL_WHERE);
+ if (sqls.length > 2) {
+ throw new IllegalArgumentException("sql should not contain more than one where");
+ }
+
+ int i = 0;
+ while (i < rangePairs.size()) {
+ String query = " where (" + config.getSplitKey() + " >= " + rangePairs.get(i).getLeft() + " and " + config.getSplitKey() + " < " + rangePairs.get(i).getRight() + ") ";
+ i++;
+ query = sqls[0] + query;
+ if (sqls.length > 1) {
+ query = query + " and ( " + sqls[1] + " ) ";
+ }
+ influxDBSourceSplits.add(new InfluxDBSourceSplit(String.valueOf(i + System.nanoTime()), query));
+ }
+ return influxDBSourceSplits;
+ }
+
+ public static List<Pair<Long, Long>> genSplitNumRange(long lowerBound, long upperBound, int splitNum) {
+ List<Pair<Long, Long>> rangeList = new ArrayList<>();
+ int numPartitions = splitNum;
+ int size = (int) (upperBound - lowerBound) / numPartitions + 1;
+ int remainder = (int) ((upperBound + 1 - lowerBound) % numPartitions);
+ if (upperBound - lowerBound < numPartitions) {
+ numPartitions = (int) (upperBound - lowerBound);
+ }
+ long currentStart = lowerBound;
+ int i = 0;
+ while (i < numPartitions) {
+ rangeList.add(Pair.of(currentStart, currentStart + size));
+ i++;
+ currentStart += size;
+ if (i + 1 <= numPartitions) {
+ currentStart = currentStart - remainder;
+ }
+ }
+ return rangeList;
+ }
+
+ private void addPendingSplit(Collection<InfluxDBSourceSplit> splits) {
+ int readerCount = context.currentParallelism();
+ for (InfluxDBSourceSplit split : splits) {
+ int ownerReader = getSplitOwner(split.splitId(), readerCount);
+ log.info("Assigning {} to {} reader.", split, ownerReader);
+ pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>())
+ .add(split);
+ }
+ }
+
+ private void assignSplit(Collection<Integer> readers) {
+ log.debug("Assign pendingSplits to readers {}", readers);
+
+ for (int reader : readers) {
+ List<InfluxDBSourceSplit> assignmentForReader = pendingSplit.remove(reader);
+ if (assignmentForReader != null && !assignmentForReader.isEmpty()) {
+ log.info("Assign splits {} to reader {}",
+ assignmentForReader, reader);
+ try {
+ context.assignSplit(reader, assignmentForReader);
+ } catch (Exception e) {
+ log.error("Failed to assign splits {} to reader {}",
+ assignmentForReader, reader, e);
+ pendingSplit.put(reader, assignmentForReader);
+ }
+ }
+ }
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ @Override
+ public void open() {
+ //nothing to do
+ }
+
+ @Override
+ public void close() {
+ //nothing to do
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ //nothing to do
+
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
new file mode 100644
index 000000000..090b8bf97
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBRowConverter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+@Slf4j
+public class InfluxdbSourceReader implements SourceReader<SeaTunnelRow, InfluxDBSourceSplit> {
+ private InfluxDB influxDB;
+ InfluxDBConfig config;
+
+ private final SourceReader.Context context;
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ List<Integer> columnsIndexList;
+ private final Queue<InfluxDBSourceSplit> pendingSplits;
+
+ private volatile boolean noMoreSplitsAssignment;
+
+ InfluxdbSourceReader(InfluxDBConfig config, Context readerContext, SeaTunnelRowType seaTunnelRowType, List<Integer> columnsIndexList) {
+ this.config = config;
+ this.pendingSplits = new LinkedList<>();
+ this.context = readerContext;
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.columnsIndexList = columnsIndexList;
+ }
+
+ public void connect() throws ConnectException {
+ if (influxDB == null) {
+ influxDB = InfluxDBClient.getInfluxDB(config);
+ String version = influxDB.version();
+ if (!influxDB.ping().isGood()) {
+ String errorMessage =
+ String.format(
+ "connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}",
+ config.getUrl());
+ throw new ConnectException(errorMessage);
+ }
+ log.info("connect influxdb successful. sever version :{}.", version);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ connect();
+ }
+
+ @Override
+ public void close() {
+ if (influxDB != null) {
+ influxDB.close();
+ influxDB = null;
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) {
+ while (!pendingSplits.isEmpty()) {
+ synchronized (output.getCheckpointLock()) {
+ InfluxDBSourceSplit split = pendingSplits.poll();
+ read(split, output);
+ }
+ }
+
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())
+ && noMoreSplitsAssignment
+ && pendingSplits.isEmpty()) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded influxDB source");
+ context.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public List<InfluxDBSourceSplit> snapshotState(long checkpointId) {
+ return new ArrayList<>(pendingSplits);
+ }
+
+ @Override
+ public void addSplits(List<InfluxDBSourceSplit> splits) {
+ pendingSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ log.info("Reader received NoMoreSplits event.");
+ noMoreSplitsAssignment = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+
+ }
+
+ private void read(InfluxDBSourceSplit split, Collector<SeaTunnelRow> output) {
+ QueryResult queryResult = influxDB.query(new Query(split.getQuery(), config.getDatabase()));
+ for (QueryResult.Result result : queryResult.getResults()) {
+ List<QueryResult.Series> serieList = result.getSeries();
+ if (CollectionUtils.isNotEmpty(serieList)) {
+ for (QueryResult.Series series : serieList) {
+ for (List<Object> values : series.getValues()) {
+ SeaTunnelRow row = InfluxDBRowConverter.convert(values, seaTunnelRowType, columnsIndexList);
+ output.collect(row);
+ }
+ }
+ } else {
+ log.debug(
+ "split[{}] reader influxDB series is empty.", split.splitId());
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/state/InfluxDBSourceState.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/state/InfluxDBSourceState.java
new file mode 100644
index 000000000..3d9852bba
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/state/InfluxDBSourceState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.state;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@AllArgsConstructor
+@Getter
+public class InfluxDBSourceState implements Serializable {
+
+ private boolean shouldEnumerate;
+ private Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index df037152e..37dd5223c 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -55,6 +55,7 @@
<module>connector-sentry</module>
<module>connector-mongodb</module>
<module>connector-iceberg</module>
+ <module>connector-influxdb</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml
similarity index 62%
copy from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml
index 08bfa545f..83353aa6c 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml
@@ -13,36 +13,31 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e</artifactId>
+ <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
-
- <modules>
- <module>connector-spark-e2e-base</module>
- <module>connector-datahub-spark-e2e</module>
- <module>connector-fake-spark-e2e</module>
- <module>connector-file-spark-e2e</module>
- <module>connector-iotdb-spark-e2e</module>
- <module>connector-jdbc-spark-e2e</module>
- <module>connector-mongodb-spark-e2e</module>
- </modules>
+ <artifactId>connector-influxdb-flink-e2e</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-starter</artifactId>
+ <artifactId>connector-flink-e2e-base</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-influxdb</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
-
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java
new file mode 100644
index 000000000..336d1743d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.influxdb;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class InfluxDBSourceToAssertIT extends FlinkContainer {
+
+ private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8";
+ private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host";
+ private static final String INFLUXDB_HOST = "localhost";
+
+ private static final int INFLUXDB_PORT = 8764;
+ private static final int INFLUXDB_CONTAINER_PORT = 8086;
+ private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT);
+ private static final String INFLUXDB_DATABASE = "test";
+ private static final String INFLUXDB_MEASUREMENT = "test";
+
+ private GenericContainer<?> influxDBServer;
+
+ private InfluxDB influxDB;
+
+ @BeforeEach
+ public void startInfluxDBContainer() throws ClassNotFoundException, SQLException, ConnectException {
+ influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INFLUXDB_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ influxDBServer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", INFLUXDB_PORT, INFLUXDB_CONTAINER_PORT)));
+ Startables.deepStart(Stream.of(influxDBServer)).join();
+ log.info("influxdb container started");
+ initializeInfluxDBClient();
+ batchInsertData();
+ }
+
+ @Test
+ public void testInfluxDBSource() throws IOException, InterruptedException, SQLException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/influxdb/influxdb_source_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ private void initializeInfluxDBClient() throws SQLException, ClassNotFoundException, ConnectException {
+ InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL);
+ influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+ }
+
+ public void batchInsertData() {
+ influxDB.createDatabase(INFLUXDB_DATABASE);
+ BatchPoints batchPoints = BatchPoints
+ .database(INFLUXDB_DATABASE)
+ .build();
+ for (int i = 0; i < 100; i++) {
+ Point point = Point.measurement(INFLUXDB_MEASUREMENT)
+ .time(new Date().getTime(), TimeUnit.NANOSECONDS)
+ .tag("label", String.format("label_%s", i))
+ .addField("f1", String.format("f1_%s", i))
+ .addField("f2", Double.valueOf(i + 1))
+ .addField("f3", Long.valueOf(i + 2))
+ .addField("f4", Float.valueOf(i + 3))
+ .addField("f5", Integer.valueOf(i))
+ .addField("f6", (short) (i + 4))
+ .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE)
+ .build();
+ batchPoints.point(point);
+ }
+ influxDB.write(batchPoints);
+ }
+
+ @AfterEach
+ public void closeInfluxDBContainer() {
+ if (influxDBServer != null) {
+ influxDBServer.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
new file mode 100644
index 000000000..eb2a1e6c5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+ database = "test"
+ upper_bound = 100
+ lower_bound = 1
+ partition_num = 4
+ split_column = "f5"
+ fields {
+ label = STRING
+ f1 = STRING
+ f2 = DOUBLE
+ f3 = BIGINT
+ f4 = FLOAT
+ f5 = INT
+ f6 = SMALLINT
+ f7 = BOOLEAN
+ }
+ }
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ field_rules = [{
+ field_name = f1
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 4
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ },{
+ field_name = f2
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 100
+ }
+ ]
+ },{
+ field_name = f3
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 2
+ },
+ {
+ rule_type = MAX
+ rule_value = 101
+ }
+ ]
+ },{
+ field_name = f4
+ field_type = float
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 3
+ },
+ {
+ rule_type = MAX
+ rule_value = 102
+ }
+ ]
+ },{
+ field_name = f5
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ },{
+ field_name = f6
+ field_type = short
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 4
+ },
+ {
+ rule_type = MAX
+ rule_value = 103
+ }
+ ]
+ },{
+ field_name = f7
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 1
+ }
+ ]
+ }
+ ]
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index b492ce56c..95d24c889 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -36,6 +36,7 @@
<module>connector-fake-flink-e2e</module>
<module>connector-mongodb-flink-e2e</module>
<module>connector-iceberg-flink-e2e</module>
+ <module>connector-influxdb-flink-e2e</module>
</modules>
<dependencies>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml
similarity index 59%
copy from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml
index b492ce56c..11fd1b931 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml
@@ -13,37 +13,38 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
- <artifactId>seatunnel-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
-
- <modules>
- <module>connector-flink-e2e-base</module>
- <module>connector-file-flink-e2e</module>
- <module>connector-jdbc-flink-e2e</module>
- <module>connector-iotdb-flink-e2e</module>
- <module>connector-datahub-flink-e2e</module>
- <module>connector-assert-flink-e2e</module>
- <module>connector-fake-flink-e2e</module>
- <module>connector-mongodb-flink-e2e</module>
- <module>connector-iceberg-flink-e2e</module>
- </modules>
+ <artifactId>connector-influxdb-spark-e2e</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-starter</artifactId>
+ <artifactId>connector-spark-e2e-base</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-influxdb</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java
new file mode 100644
index 000000000..6283f3e82
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.influxdb;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class InfluxDBSourceToAssertIT extends SparkContainer {
+
+ private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8";
+ private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host";
+ private static final String INFLUXDB_HOST = "localhost";
+
+ private static final int INFLUXDB_PORT = 8764;
+ private static final int INFLUXDB_CONTAINER_PORT = 8086;
+ private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT);
+ private static final String INFLUXDB_DATABASE = "test";
+ private static final String INFLUXDB_MEASUREMENT = "test";
+
+ private GenericContainer<?> influxDBServer;
+
+ private InfluxDB influxDB;
+
+ @BeforeEach
+ public void startInfluxDBContainer() throws ClassNotFoundException, SQLException, ConnectException {
+ influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INFLUXDB_CONTAINER_HOST)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ influxDBServer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", INFLUXDB_PORT, INFLUXDB_CONTAINER_PORT)));
+ Startables.deepStart(Stream.of(influxDBServer)).join();
+ log.info("influxdb container started");
+ initializeInfluxDBClient();
+ batchInsertData();
+ }
+
+ @Test
+ public void testInfluxDBSource() throws IOException, InterruptedException, SQLException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/influxdb/influxdb_source_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ private void initializeInfluxDBClient() throws SQLException, ClassNotFoundException, ConnectException {
+ InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL);
+ influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+ }
+
+ public void batchInsertData() {
+ influxDB.createDatabase(INFLUXDB_DATABASE);
+ BatchPoints batchPoints = BatchPoints
+ .database(INFLUXDB_DATABASE)
+ .build();
+ for (int i = 0; i < 100; i++) {
+ Point point = Point.measurement(INFLUXDB_MEASUREMENT)
+ .time(new Date().getTime(), TimeUnit.NANOSECONDS)
+ .tag("label", String.format("label_%s", i))
+ .addField("f1", String.format("f1_%s", i))
+ .addField("f2", Double.valueOf(i + 1))
+ .addField("f3", Long.valueOf(i + 2))
+ .addField("f4", Float.valueOf(i + 3))
+ .addField("f5", Integer.valueOf(i))
+ .addField("f6", (short) (i + 4))
+ .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE)
+ .build();
+ batchPoints.point(point);
+ }
+ influxDB.write(batchPoints);
+ }
+
+ @AfterEach
+ public void closeInfluxDBContainer() {
+ if (influxDBServer != null) {
+ influxDBServer.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
new file mode 100644
index 000000000..f8901980d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ InfluxDB {
+ url = "http://influxdb-host:8086"
+ sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+ database = "test"
+ upper_bound = 100
+ lower_bound = 1
+ partition_num = 4
+ split_column = "f5"
+ fields {
+ label = STRING
+ f1 = STRING
+ f2 = DOUBLE
+ f3 = BIGINT
+ f4 = FLOAT
+ f5 = INT
+ f6 = SMALLINT
+ f7 = BOOLEAN
+ }
+ }
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ field_rules = [{
+ field_name = f1
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 4
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ },{
+ field_name = f2
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 100
+ }
+ ]
+ },{
+ field_name = f3
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 2
+ },
+ {
+ rule_type = MAX
+ rule_value = 101
+ }
+ ]
+ },{
+ field_name = f4
+ field_type = float
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 3
+ },
+ {
+ rule_type = MAX
+ rule_value = 102
+ }
+ ]
+ },{
+ field_name = f5
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ },{
+ field_name = f6
+ field_type = short
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 4
+ },
+ {
+ rule_type = MAX
+ rule_value = 103
+ }
+ ]
+ },{
+ field_name = f7
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 1
+ }
+ ]
+ }
+ ]
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 08bfa545f..64fc92056 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
<module>connector-iotdb-spark-e2e</module>
<module>connector-jdbc-spark-e2e</module>
<module>connector-mongodb-spark-e2e</module>
+ <module>connector-influxdb-spark-e2e</module>
</modules>
<dependencies>