You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2023/01/10 02:06:15 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] add tdengine source (#2832)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new acf4d5b1b [Feature][Connector-V2] add tdengine source (#2832)
acf4d5b1b is described below
commit acf4d5b1b469adf383b06266ef2e9815f472cd25
Author: Li Hongyu <lh...@gmail.com>
AuthorDate: Tue Jan 10 10:06:09 2023 +0800
[Feature][Connector-V2] add tdengine source (#2832)
* [Feature][Connector-V2]add tdengine source and sink
1. add ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY to statement
2. add tdengine e2e module
* SourceSplitEnumerator.Context
Co-authored-by: bjyflihongyu <li...@jd.com>
Co-authored-by: tyrantlucifer <ty...@gmail.com>
---
docs/en/connector-v2/sink/TDengine.md | 69 +++++++
docs/en/connector-v2/source/TDengine.md | 82 +++++++++
plugin-mapping.properties | 3 +
seatunnel-connectors-v2/connector-tdengine/pom.xml | 45 +++++
.../tdengine/config/TDengineSourceConfig.java | 79 ++++++++
.../exception/TDengineConnectorException.java | 36 ++++
.../seatunnel/tdengine/sink/TDengineSink.java | 63 +++++++
.../tdengine/sink/TDengineSinkWriter.java | 120 ++++++++++++
.../seatunnel/tdengine/source/TDengineSource.java | 141 ++++++++++++++
.../tdengine/source/TDengineSourceReader.java | 159 ++++++++++++++++
.../tdengine/source/TDengineSourceSplit.java | 46 +++++
.../source/TDengineSourceSplitEnumerator.java | 204 +++++++++++++++++++++
.../tdengine/state/TDengineSourceState.java | 36 ++++
.../tdengine/typemapper/TDengineTypeMapper.java | 153 ++++++++++++++++
.../seatunnel/tdengine/TDengineTest.java | 50 +++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../connector-tdengine-e2e/pom.xml | 47 +++++
.../e2e/connector/tdengine/TDengineIT.java | 198 ++++++++++++++++++++
.../tdengine/tdengine_source_to_sink.conf | 59 ++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
.../source/batch/ParallelBatchPartitionReader.java | 4 +-
22 files changed, 1601 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/sink/TDengine.md b/docs/en/connector-v2/sink/TDengine.md
new file mode 100644
index 000000000..597a7a99b
--- /dev/null
+++ b/docs/en/connector-v2/sink/TDengine.md
@@ -0,0 +1,69 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine. You need to create stable before running seatunnel task
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------|
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| database | string | yes | |
+| stable | string | yes | - |
+| timezone | string | no | UTC |
+
+### url [string]
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### timezone [string]
+
+the timeznoe of the TDengine sever, it's important to the ts field
+
+## Example
+
+### sink
+
+```hocon
+sink {
+ TDengine {
+ url : "jdbc:TAOS-RS://localhost:6041/"
+ username : "root"
+ password : "taosdata"
+ database : "power2"
+ stable : "meters2"
+ timezone: UTC
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/TDengine.md b/docs/en/connector-v2/source/TDengine.md
new file mode 100644
index 000000000..fea0a080d
--- /dev/null
+++ b/docs/en/connector-v2/source/TDengine.md
@@ -0,0 +1,82 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------|
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| database | string | yes | |
+| stable | string | yes | - |
+| lower_bound | long | yes | - |
+| upper_bound | long | yes | - |
+
+### url [string]
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### lower_bound [long]
+
+the lower_bound of the migration period
+
+### upper_bound [long]
+
+the upper_bound of the migration period
+
+## Example
+
+### source
+```hocon
+source {
+ TDengine {
+ url : "jdbc:TAOS-RS://localhost:6041/"
+ username : "root"
+ password : "taosdata"
+ database : "power"
+ stable : "meters"
+ lower_bound : "2018-10-03 14:38:05.000"
+ upper_bound : "2018-10-03 14:38:16.800"
+ result_table_name = "tdengine_result"
+ }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c12aa0ec7..8e55c397c 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -166,4 +166,7 @@ seatunnel.source.Maxcompute = connector-maxcompute
seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
seatunnel.sink.S3Redshift = connector-s3-redshift
+seatunnel.source.TDengine = connector-tdengine
+seatunnel.sink.TDengine = connector-tdengine
seatunnel.source.Persistiq = connector-http-persistiq
+
diff --git a/seatunnel-connectors-v2/connector-tdengine/pom.xml b/seatunnel-connectors-v2/connector-tdengine/pom.xml
new file mode 100644
index 000000000..31db6a553
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-tdengine</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.taosdata.jdbc</groupId>
+ <artifactId>taos-jdbcdriver</artifactId>
+ <version>3.0.3</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
new file mode 100644
index 000000000..4bd7e1c60
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TDengineSourceConfig implements Serializable {
+
+ /**
+ * jdbc:TAOS-RS://localhost:6041/
+ */
+ private String url;
+ private String username;
+ private String password;
+ private String database;
+ private String stable;
+ //param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side, other than the JDBC client side
+ //so this param represent the server-side timezone setting up
+ private String timezone;
+ private String lowerBound;
+ private String upperBound;
+ private List<String> fields;
+ private List<String> tags;
+
+ public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
+ TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
+ tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null);
+ tdengineSourceConfig.setDatabase(pluginConfig.hasPath(DATABASE) ? pluginConfig.getString(DATABASE) : null);
+ tdengineSourceConfig.setStable(pluginConfig.hasPath(STABLE) ? pluginConfig.getString(STABLE) : null);
+ tdengineSourceConfig.setUsername(pluginConfig.hasPath(USERNAME) ? pluginConfig.getString(USERNAME) : null);
+ tdengineSourceConfig.setPassword(pluginConfig.hasPath(PASSWORD) ? pluginConfig.getString(PASSWORD) : null);
+ tdengineSourceConfig.setUpperBound(pluginConfig.hasPath(UPPER_BOUND) ? pluginConfig.getString(UPPER_BOUND) : null);
+ tdengineSourceConfig.setLowerBound(pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null);
+ tdengineSourceConfig.setTimezone(pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC");
+ return tdengineSourceConfig;
+ }
+
+ public static class ConfigNames {
+
+ public static String URL = "url";
+ public static String USERNAME = "username";
+ public static String PASSWORD = "password";
+ public static String DATABASE = "database";
+ public static String STABLE = "stable";
+ public static String TIMEZONE = "timezone";
+ public static String LOWER_BOUND = "lower_bound";
+ public static String UPPER_BOUND = "upper_bound";
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java
new file mode 100644
index 000000000..77e1edac8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class TDengineConnectorException extends SeaTunnelRuntimeException {
+ public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
new file mode 100644
index 000000000..a06cd8999
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+ private SeaTunnelRowType seaTunnelRowType;
+ private Config pluginConfig;
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
+ return new TDengineSinkWriter(pluginConfig, seaTunnelRowType);
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public String getPluginName() {
+ return "TDengine";
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
new file mode 100644
index 000000000..e6c0b5785
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+@Slf4j
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+ private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+ private final Connection conn;
+ private final TDengineSourceConfig config;
+ private int tagsNum;
+
+ @SneakyThrows
+ public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+ config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+ String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+ conn = DriverManager.getConnection(jdbcUrl);
+ try (Statement statement = conn.createStatement()) {
+ final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+ while (metaResultSet.next()) {
+ if (StringUtils.equals("TAG", metaResultSet.getString("note"))) {
+ tagsNum++;
+ }
+ }
+ }
+ }
+
+ @SneakyThrows
+ @Override
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public void write(SeaTunnelRow element) {
+ final ArrayList<Object> tags = Lists.newArrayList();
+ for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+ tags.add(element.getField(i));
+ }
+ final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+ final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+ try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+ String sql = String.format("INSERT INTO %s using %s tags ( %s ) VALUES ( %s );",
+ element.getField(0),
+ config.getStable(),
+ tagValues,
+ StringUtils.join(convertDataType(metrics), ","));
+ final int rowCount = statement.executeUpdate(sql);
+ if (rowCount == 0) {
+ Throwables.propagateIfPossible(new TDengineConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "insert error:" + element));
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (Objects.nonNull(conn)) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ throw new TDengineConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, "TDengine writer connection close failed", e);
+ }
+ }
+ }
+
+ private Object[] convertDataType(Object[] objects) {
+ return Arrays.stream(objects)
+ .map(object -> {
+ if (LocalDateTime.class.equals(object.getClass())) {
+ //transform timezone according to the config
+ return "'" + ((LocalDateTime) object).atZone(ZoneId.systemDefault()).withZoneSameInstant(ZoneId.of(config.getTimezone())).format(FORMATTER) + "'";
+ } else if (String.class.equals(object.getClass())) {
+ return "'" + object + "'";
+ }
+ return object;
+ })
+ .toArray();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
new file mode 100644
index 000000000..a3d393d69
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.buildSourceConfig;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceReader.Context;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.List;
+
+/**
+ * TDengine source each split corresponds one subtable
+ * <p>
+ * TODO: wait for optimization
+ * 1. batch -> batch + stream
+ * 2. one item of data writing -> a batch of data writing
+ */
+@AutoService(SeaTunnelSource.class)
+public class TDengineSource implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, TDengineSourceState> {
+
+ private SeaTunnelRowType seaTunnelRowType;
+ private TDengineSourceConfig tdengineSourceConfig;
+
+ @Override
+ public String getPluginName() {
+ return "TDengine";
+ }
+
+ @SneakyThrows
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, DATABASE, STABLE, USERNAME, PASSWORD);
+ if (!result.isSuccess()) {
+ throw new TDengineConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "TDengine connection require url/database/stable/username/password. All of these must not be empty.");
+ }
+ tdengineSourceConfig = buildSourceConfig(pluginConfig);
+
+ //add subtable_name and tags to `seaTunnelRowType`
+ SeaTunnelRowType originRowType = getSTableMetaInfo(tdengineSourceConfig);
+ seaTunnelRowType = addHiddenAttribute(originRowType);
+ }
+
+ @SneakyThrows
+ private SeaTunnelRowType getSTableMetaInfo(TDengineSourceConfig config) {
+ String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+ Connection conn = DriverManager.getConnection(jdbcUrl);
+ List<String> fieldNames = Lists.newArrayList();
+ List<SeaTunnelDataType<?>> fieldTypes = Lists.newArrayList();
+ try (Statement statement = conn.createStatement()) {
+ final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+ while (metaResultSet.next()) {
+ fieldNames.add(metaResultSet.getString(1));
+ fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
+ }
+ }
+ return new SeaTunnelRowType(fieldNames.toArray(new String[0]), fieldTypes.toArray(new SeaTunnelDataType<?>[0]));
+ }
+
+ private SeaTunnelRowType addHiddenAttribute(SeaTunnelRowType originRowType) {
+ //0-subtable_name / 1-n field_names /
+ String[] fieldNames = ArrayUtils.add(originRowType.getFieldNames(), 0, "subtable_name");
+ // n+1-> tags
+ SeaTunnelDataType<?>[] fieldTypes = ArrayUtils.add(originRowType.getFieldTypes(), 0, BasicType.STRING_TYPE);
+ return new SeaTunnelRowType(fieldNames, fieldTypes);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, TDengineSourceSplit> createReader(Context readerContext) {
+ return new TDengineSourceReader(tdengineSourceConfig, readerContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> createEnumerator(SourceSplitEnumerator.Context<TDengineSourceSplit> enumeratorContext) {
+ return new TDengineSourceSplitEnumerator(seaTunnelRowType, tdengineSourceConfig, enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> restoreEnumerator(SourceSplitEnumerator.Context<TDengineSourceSplit> enumeratorContext,
+ TDengineSourceState checkpointState) {
+ return new TDengineSourceSplitEnumerator(seaTunnelRowType, tdengineSourceConfig, checkpointState, enumeratorContext);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
new file mode 100644
index 000000000..1795b794e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+ private static final long THREAD_WAIT_TIME = 500L;
+
+ private final TDengineSourceConfig config;
+
+ private final Set<TDengineSourceSplit> sourceSplits;
+
+ private final Context context;
+
+ private Connection conn;
+
+ public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+ this.config = config;
+ this.sourceSplits = Sets.newHashSet();
+ this.context = readerContext;
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+ if (sourceSplits.isEmpty()) {
+ Thread.sleep(THREAD_WAIT_TIME);
+ return;
+ }
+ synchronized (collector.getCheckpointLock()) {
+ sourceSplits.forEach(split -> {
+ try {
+ read(split, collector);
+ } catch (Exception e) {
+ throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine split read error", e);
+ }
+ });
+ }
+
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded TDengine source");
+ context.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public void open(){
+ String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+ Properties connProps = new Properties();
+ //todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
+ // there is a exception : Caused by: java.sql.SQLException: can't create connection with server
+ // under docker network env
+ // @bobo (tdengine)
+ connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false");
+ try {
+ conn = DriverManager.getConnection(jdbcUrl, connProps);
+ } catch (SQLException e) {
+ throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "get TDengine connection failed:" + jdbcUrl);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (!Objects.isNull(conn)) {
+ conn.close();
+ }
+ } catch (SQLException e) {
+ throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine reader connection close failed", e);
+ }
+ }
+
+ private void read(TDengineSourceSplit split, Collector<SeaTunnelRow> output) throws Exception {
+ try (Statement statement = conn.createStatement()) {
+ final ResultSet resultSet = statement.executeQuery(split.getQuery());
+ ResultSetMetaData meta = resultSet.getMetaData();
+
+ while (resultSet.next()) {
+ Object[] datas = new Object[meta.getColumnCount() + 1];
+ datas[0] = split.splitId();
+ for (int i = 1; i <= meta.getColumnCount(); i++) {
+ datas[i] = convertDataType(resultSet.getObject(i));
+ }
+ output.collect(new SeaTunnelRow(datas));
+ }
+ }
+ }
+
+ private Object convertDataType(Object object) {
+ if (Timestamp.class.equals(object.getClass())) {
+ return ((Timestamp) object).toLocalDateTime();
+ } else if (byte[].class.equals(object.getClass())) {
+ return new String((byte[]) object);
+ }
+ return object;
+ }
+
+ @Override
+ public List<TDengineSourceSplit> snapshotState(long checkpointId) {
+ return new ArrayList<>(sourceSplits);
+ }
+
+ @Override
+ public void addSplits(List<TDengineSourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ // do nothing
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // do nothing
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java
new file mode 100644
index 000000000..2b8ad47f8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class TDengineSourceSplit implements SourceSplit {
+
+ private static final long serialVersionUID = -1L;
+
+ private String splitId;
+
+ /**
+ * final query statement
+ */
+ private String query;
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public TDengineSourceSplit(String splitId, String query) {
+ this.splitId = splitId;
+ this.query = query;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
new file mode 100644
index 000000000..2a8d4656a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+
+import com.google.common.collect.Sets;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
+
+ private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
+ private final TDengineSourceConfig config;
+ private Set<TDengineSourceSplit> pendingSplit = new HashSet<>();
+ private Set<TDengineSourceSplit> assignedSplit = new HashSet<>();
+ private Connection conn;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+ this(seaTunnelRowType, config, null, context);
+ }
+
+ public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+ this.config = config;
+ this.context = context;
+ this.seaTunnelRowType = seaTunnelRowType;
+ if (sourceState != null) {
+ this.assignedSplit = sourceState.getAssignedSplit();
+ }
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ @SneakyThrows
+ @Override
+ public void open() {
+ String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+ conn = DriverManager.getConnection(jdbcUrl);
+ }
+
+ @Override
+ public void run() throws SQLException {
+ pendingSplit = getAllSplits();
+ assignSplit(context.registeredReaders());
+ }
+
+ /*
+ * 1. get timestampField
+ * 2. get all sub tables of configured super table
+ * 3. each split has one sub table
+ */
+ private Set<TDengineSourceSplit> getAllSplits() throws SQLException {
+ final String timestampFieldName;
+ try (Statement statement = conn.createStatement()) {
+ final ResultSet fieldNameResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+ fieldNameResultSet.next();
+ timestampFieldName = fieldNameResultSet.getString(1);
+ }
+
+ final Set<TDengineSourceSplit> splits = Sets.newHashSet();
+ try (Statement statement = conn.createStatement()) {
+ String metaSQL = "select table_name from information_schema.ins_tables where db_name = '" + config.getDatabase() + "' and stable_name='" + config.getStable() + "';";
+ ResultSet subTableNameResultSet = statement.executeQuery(metaSQL);
+ while (subTableNameResultSet.next()) {
+ final String subTableName = subTableNameResultSet.getString(1);
+ final TDengineSourceSplit splitBySubTable = createSplitBySubTable(subTableName, timestampFieldName);
+ splits.add(splitBySubTable);
+ }
+ }
+ return splits;
+ }
+
+ private TDengineSourceSplit createSplitBySubTable(String subTableName, String timestampFieldName) {
+ String selectFields = Arrays.stream(seaTunnelRowType.getFieldNames()).skip(1).collect(Collectors.joining(","));
+ String subTableSQL = "select " + selectFields + " from " + config.getDatabase() + "." + subTableName;
+ String start = config.getLowerBound();
+ String end = config.getUpperBound();
+ if (start != null || end != null) {
+ String startCondition = null;
+ String endCondition = null;
+ //Left closed right away
+ if (start != null) {
+ startCondition = timestampFieldName + " >= '" + start + "'";
+ }
+ if (end != null) {
+ endCondition = timestampFieldName + " < '" + end + "'";
+ }
+ String query = StringUtils.join(new String[]{startCondition, endCondition}, " and ");
+ subTableSQL = subTableSQL + " where " + query;
+ }
+
+ return new TDengineSourceSplit(subTableName, subTableSQL);
+ }
+
+ @Override
+ public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
+ if (!splits.isEmpty()) {
+ pendingSplit.addAll(splits);
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ if (!pendingSplit.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+
+ private void assignSplit(Collection<Integer> taskIDList) {
+ assignedSplit = pendingSplit.stream()
+ .map(split -> {
+ int splitOwner = getSplitOwner(split.splitId(), context.currentParallelism());
+ if (taskIDList.contains(splitOwner)) {
+ context.assignSplit(splitOwner, split);
+ return split;
+ } else {
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ pendingSplit.clear();
+ }
+
+ @Override
+ public TDengineSourceState snapshotState(long checkpointId) {
+ return new TDengineSourceState(assignedSplit);
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ //nothing to do
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId);
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (!Objects.isNull(conn)) {
+ conn.close();
+ }
+ } catch (SQLException e) {
+ throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine split_enumerator connection close failed", e);
+ }
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ //nothing to do
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
new file mode 100644
index 000000000..fc839682a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.state;
+
+import org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class TDengineSourceState implements Serializable {
+
+ private final Set<TDengineSourceSplit> assignedSplit;
+
+ public TDengineSourceState(Set<TDengineSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
+ }
+
+ public Set<TDengineSourceSplit> getAssignedSplit() {
+ return assignedSplit;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java
new file mode 100644
index 000000000..35e50c670
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TDengineTypeMapper {
+
+
+ // ============================data types=====================
+
+ private static final String TDENGINE_UNKNOWN = "UNKNOWN";
+ private static final String TDENGINE_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ private static final String TDENGINE_TINYINT = "TINYINT";
+ private static final String TDENGINE_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String TDENGINE_SMALLINT = "SMALLINT";
+ private static final String TDENGINE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String TDENGINE_MEDIUMINT = "MEDIUMINT";
+ private static final String TDENGINE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ private static final String TDENGINE_INT = "INT";
+ private static final String TDENGINE_INT_UNSIGNED = "INT UNSIGNED";
+ private static final String TDENGINE_INTEGER = "INTEGER";
+ private static final String TDENGINE_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ private static final String TDENGINE_BIGINT = "BIGINT";
+ private static final String TDENGINE_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String TDENGINE_DECIMAL = "DECIMAL";
+ private static final String TDENGINE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String TDENGINE_FLOAT = "FLOAT";
+ private static final String TDENGINE_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String TDENGINE_DOUBLE = "DOUBLE";
+ private static final String TDENGINE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ private static final String TDENGINE_CHAR = "CHAR";
+ private static final String TDENGINE_VARCHAR = "VARCHAR";
+ private static final String TDENGINE_TINYTEXT = "TINYTEXT";
+ private static final String TDENGINE_MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String TDENGINE_TEXT = "TEXT";
+ private static final String TDENGINE_LONGTEXT = "LONGTEXT";
+ private static final String TDENGINE_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ private static final String TDENGINE_DATE = "DATE";
+ private static final String TDENGINE_DATETIME = "DATETIME";
+ private static final String TDENGINE_TIME = "TIME";
+ private static final String TDENGINE_TIMESTAMP = "TIMESTAMP";
+ private static final String TDENGINE_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ private static final String TDENGINE_TINYBLOB = "TINYBLOB";
+ private static final String TDENGINE_MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String TDENGINE_BLOB = "BLOB";
+ private static final String TDENGINE_LONGBLOB = "LONGBLOB";
+ private static final String TDENGINE_BINARY = "BINARY";
+ private static final String TDENGINE_VARBINARY = "VARBINARY";
+ private static final String TDENGINE_GEOMETRY = "GEOMETRY";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static SeaTunnelDataType<?> mapping(String tdengineType) {
+ switch (tdengineType) {
+ case TDENGINE_BIT:
+ return BasicType.BOOLEAN_TYPE;
+ case TDENGINE_TINYINT:
+ case TDENGINE_TINYINT_UNSIGNED:
+ case TDENGINE_SMALLINT:
+ case TDENGINE_SMALLINT_UNSIGNED:
+ case TDENGINE_MEDIUMINT:
+ case TDENGINE_MEDIUMINT_UNSIGNED:
+ case TDENGINE_INT:
+ case TDENGINE_INTEGER:
+ case TDENGINE_YEAR:
+ return BasicType.INT_TYPE;
+ case TDENGINE_INT_UNSIGNED:
+ case TDENGINE_INTEGER_UNSIGNED:
+ case TDENGINE_BIGINT:
+ return BasicType.LONG_TYPE;
+ case TDENGINE_BIGINT_UNSIGNED:
+ return new DecimalType(20, 0);
+ case TDENGINE_DECIMAL:
+ log.warn("{} will probably cause value overflow.", TDENGINE_DECIMAL);
+ return new DecimalType(38, 18);
+ case TDENGINE_DECIMAL_UNSIGNED:
+ return new DecimalType(38, 18);
+ case TDENGINE_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case TDENGINE_FLOAT_UNSIGNED:
+ log.warn("{} will probably cause value overflow.", TDENGINE_FLOAT_UNSIGNED);
+ return BasicType.FLOAT_TYPE;
+ case TDENGINE_DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case TDENGINE_DOUBLE_UNSIGNED:
+ log.warn("{} will probably cause value overflow.", TDENGINE_DOUBLE_UNSIGNED);
+ return BasicType.DOUBLE_TYPE;
+ case TDENGINE_CHAR:
+ case TDENGINE_TINYTEXT:
+ case TDENGINE_MEDIUMTEXT:
+ case TDENGINE_TEXT:
+ case TDENGINE_VARCHAR:
+ case TDENGINE_JSON:
+ case TDENGINE_LONGTEXT:
+ return BasicType.STRING_TYPE;
+ case TDENGINE_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TDENGINE_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case TDENGINE_DATETIME:
+ case TDENGINE_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+ case TDENGINE_TINYBLOB:
+ case TDENGINE_MEDIUMBLOB:
+ case TDENGINE_BLOB:
+ case TDENGINE_LONGBLOB:
+ case TDENGINE_VARBINARY:
+ case TDENGINE_BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+
+ //Doesn't support yet
+ case TDENGINE_GEOMETRY:
+ case TDENGINE_UNKNOWN:
+ default:
+ throw new TDengineConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, String.format(
+ "Doesn't support TDENGINE type '%s' on column '%s' yet.",
+ tdengineType));
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java
new file mode 100644
index 000000000..118446193
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine;
+
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+public class TDengineTest {
+
+ public void testQueryUrl(String jdbcUrl) {
+ Assertions.assertDoesNotThrow(() -> {
+ try (Connection conn = getConnection(jdbcUrl)) {
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("SELECT location,AVG(voltage) FROM meters GROUP BY location;");
+ }
+ }
+ });
+ }
+
+ @SneakyThrows
+ private Connection getConnection(String jdbcUrl) {
+ Properties connProps = new Properties();
+ connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+ return DriverManager.getConnection(jdbcUrl, connProps);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index ff56c817b..bdae3668b 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -68,6 +68,7 @@
<module>connector-openmldb</module>
<module>connector-doris</module>
<module>connector-maxcompute</module>
+ <module>connector-tdengine</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index af2ad8ac0..f887959b7 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -443,6 +443,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-tdengine</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- jdbc driver -->
<dependency>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml
new file mode 100644
index 000000000..974a96be0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-tdengine-e2e</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+ <dependencies>
+ <!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-tdengine</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
new file mode 100644
index 000000000..19ef53524
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.tdengine;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class TDengineIT extends TestSuiteBase implements TestResource {
+ private static final String DOCKER_IMAGE = "tdengine/tdengine:3.0.2.1";
+ private static final String NETWORK_ALIASES1 = "flink_e2e_tdengine_src";
+ private static final String NETWORK_ALIASES2 = "flink_e2e_tdengine_sink";
+ private static final int PORT = 6041;
+
+ private GenericContainer<?> tdengineServer1;
+ private GenericContainer<?> tdengineServer2;
+ private Connection connection1;
+ private Connection connection2;
+ private int testDataCount;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ tdengineServer1 = new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(NETWORK_ALIASES1)
+ .withExposedPorts(PORT)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)))
+ .waitingFor(new HostPortWaitStrategy()
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ tdengineServer2 = new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(NETWORK_ALIASES2)
+ .withExposedPorts(PORT)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)))
+ .waitingFor(new HostPortWaitStrategy()
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ Startables.deepStart(Stream.of(tdengineServer1)).join();
+ Startables.deepStart(Stream.of(tdengineServer2)).join();
+ log.info("TDengine container started");
+ connection1 = createConnect(tdengineServer1);
+ connection2 = createConnect(tdengineServer2);
+ // wait for TDengine fully start
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(Boolean.TRUE, connection1.isValid(100) & connection2.isValid(100)));
+ testDataCount = generateTestDataSet();
+ log.info("tdengine testDataCount=" + testDataCount); // rowCount=8
+ }
+
+ @SneakyThrows
+ private int generateTestDataSet() {
+ int rowCount;
+ try (Statement stmt = connection1.createStatement()) {
+ stmt.execute("CREATE DATABASE power KEEP 3650");
+ stmt.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
+ "TAGS (location BINARY(64), groupId INT)");
+ String sql = getSQL();
+ rowCount = stmt.executeUpdate(sql);
+
+ }
+ try (Statement stmt = connection2.createStatement()) {
+ stmt.execute("CREATE DATABASE power2 KEEP 3650");
+ stmt.execute("CREATE STABLE power2.meters2 (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
+ "TAGS (location BINARY(64), groupId INT)");
+ }
+ return rowCount;
+ }
+
+ @TestTemplate
+ public void testTDengine(TestContainer container) throws Exception {
+ Container.ExecResult execResult = container.executeJob("/tdengine/tdengine_source_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long rowCountInserted = readSinkDataset();
+ Assertions.assertEquals(rowCountInserted, testDataCount);
+ }
+
+ @SneakyThrows
+ private long readSinkDataset() {
+ long rowCount;
+ try (Statement stmt = connection2.createStatement()) {
+ ResultSet resultSet = stmt.executeQuery("select count(1) from power2.meters2;");
+ resultSet.next();
+ rowCount = resultSet.getLong(1);
+ }
+ return rowCount;
+ }
+
+ @SneakyThrows
+ private Connection createConnect(GenericContainer<?> tdengineServer) {
+ String jdbcUrl = "jdbc:TAOS-RS://" + tdengineServer.getHost() + ":" + tdengineServer.getFirstMappedPort() + "?user=root&password=taosdata";
+ Connection conn = DriverManager.getConnection(jdbcUrl);
+ log.info("TDengine Connected! " + jdbcUrl);
+ return conn;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (connection1 != null) {
+ connection1.close();
+ }
+ if (connection2 != null) {
+ connection2.close();
+ }
+ if (tdengineServer1 != null) {
+ tdengineServer1.stop();
+ }
+ if (tdengineServer2 != null) {
+ tdengineServer2.stop();
+ }
+ }
+
+ /**
+ * The generated SQL is:
+ * INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)
+ * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000)
+ * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000)
+ * power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000)
+ * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000)
+ * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000)
+ * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000)
+ * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000)
+ */
+ private static String getSQL() {
+ StringBuilder sb = new StringBuilder("INSERT INTO ");
+ for (String line : getRawData()) {
+ String[] ps = line.split(",");
+ sb.append("power." + ps[0]).append(" USING power.meters TAGS(")
+ .append(ps[5]).append(", ") // tag: location
+ .append(ps[6]) // tag: groupId
+ .append(") VALUES(")
+ .append('\'').append(ps[1]).append('\'').append(",") // ts
+ .append(ps[2]).append(",") // current
+ .append(ps[3]).append(",") // voltage
+ .append(ps[4]).append(") "); // phase
+ }
+ return sb.toString();
+ }
+
+ private static List<String> getRawData() {
+ return Arrays.asList(
+ "d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2",
+ "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2",
+ "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2",
+ "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3",
+ "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2",
+ "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2",
+ "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3",
+ "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3"
+ );
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
new file mode 100644
index 000000000..803a10f83
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 2
+ job.mode = "STREAMING"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ TDengine {
+ url : "jdbc:TAOS-RS://flink_e2e_tdengine_src:6041/"
+ username : "root"
+ password : "taosdata"
+ database : "power"
+ stable : "meters"
+ lower_bound : "2018-10-03 14:38:05.000"
+ upper_bound : "2018-10-03 14:38:16.801"
+ result_table_name = "tdengine_result"
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+}
+
+sink {
+ TDengine {
+ url : "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/"
+ username : "root"
+ password : "taosdata"
+ database : "power2"
+ stable : "meters2"
+ timezone : "UTC"
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 466e02041..457217735 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -45,6 +45,7 @@
<module>connector-cdc-mysql-e2e</module>
<module>connector-iceberg-e2e</module>
<module>connector-iceberg-hadoop3-e2e</module>
+ <module>connector-tdengine-e2e</module>
<module>connector-datahub-e2e</module>
</modules>
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
index 6fa2aa94a..353ada932 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
@@ -123,7 +123,9 @@ public class ParallelBatchPartitionReader {
public void close() throws IOException {
running = false;
try {
- internalSource.close();
+ if (internalSource != null) {
+ internalSource.close();
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}