You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2023/01/10 02:06:15 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] add tdengine source (#2832)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new acf4d5b1b [Feature][Connector-V2] add tdengine source (#2832)
acf4d5b1b is described below

commit acf4d5b1b469adf383b06266ef2e9815f472cd25
Author: Li Hongyu <lh...@gmail.com>
AuthorDate: Tue Jan 10 10:06:09 2023 +0800

    [Feature][Connector-V2] add tdengine source (#2832)
    
    * [Feature][Connector-V2]add tdengine source and sink
    1. add ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY to statement
    2. add tdengine e2e module
    
    
    * SourceSplitEnumerator.Context
    
    Co-authored-by: bjyflihongyu <li...@jd.com>
    Co-authored-by: tyrantlucifer <ty...@gmail.com>
---
 docs/en/connector-v2/sink/TDengine.md              |  69 +++++++
 docs/en/connector-v2/source/TDengine.md            |  82 +++++++++
 plugin-mapping.properties                          |   3 +
 seatunnel-connectors-v2/connector-tdengine/pom.xml |  45 +++++
 .../tdengine/config/TDengineSourceConfig.java      |  79 ++++++++
 .../exception/TDengineConnectorException.java      |  36 ++++
 .../seatunnel/tdengine/sink/TDengineSink.java      |  63 +++++++
 .../tdengine/sink/TDengineSinkWriter.java          | 120 ++++++++++++
 .../seatunnel/tdengine/source/TDengineSource.java  | 141 ++++++++++++++
 .../tdengine/source/TDengineSourceReader.java      | 159 ++++++++++++++++
 .../tdengine/source/TDengineSourceSplit.java       |  46 +++++
 .../source/TDengineSourceSplitEnumerator.java      | 204 +++++++++++++++++++++
 .../tdengine/state/TDengineSourceState.java        |  36 ++++
 .../tdengine/typemapper/TDengineTypeMapper.java    | 153 ++++++++++++++++
 .../seatunnel/tdengine/TDengineTest.java           |  50 +++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 .../connector-tdengine-e2e/pom.xml                 |  47 +++++
 .../e2e/connector/tdengine/TDengineIT.java         | 198 ++++++++++++++++++++
 .../tdengine/tdengine_source_to_sink.conf          |  59 ++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../source/batch/ParallelBatchPartitionReader.java |   4 +-
 22 files changed, 1601 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/sink/TDengine.md b/docs/en/connector-v2/sink/TDengine.md
new file mode 100644
index 000000000..597a7a99b
--- /dev/null
+++ b/docs/en/connector-v2/sink/TDengine.md
@@ -0,0 +1,69 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine. You need to create stable before running seatunnel task
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| timezone                     | string  | no      | UTC            |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### timezone [string]
+
+the timeznoe of the TDengine sever, it's important to the ts field
+
+## Example
+
+### sink
+
+```hocon
+sink {
+        TDengine {
+          url : "jdbc:TAOS-RS://localhost:6041/"
+          username : "root"
+          password : "taosdata"
+          database : "power2"
+          stable : "meters2"
+          timezone: UTC
+        }
+}
+```
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/TDengine.md b/docs/en/connector-v2/source/TDengine.md
new file mode 100644
index 000000000..fea0a080d
--- /dev/null
+++ b/docs/en/connector-v2/source/TDengine.md
@@ -0,0 +1,82 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| lower_bound                | long    | yes       | -             |
+| upper_bound                | long    | yes       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### lower_bound [long]
+
+the lower_bound of the migration period
+
+### upper_bound [long]
+
+the upper_bound of the migration period
+
+## Example
+
+### source
+```hocon
+source {
+        TDengine {
+          url : "jdbc:TAOS-RS://localhost:6041/"
+          username : "root"
+          password : "taosdata"
+          database : "power"
+          stable : "meters"
+          lower_bound : "2018-10-03 14:38:05.000"
+          upper_bound : "2018-10-03 14:38:16.800"
+          result_table_name = "tdengine_result"
+        }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c12aa0ec7..8e55c397c 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -166,4 +166,7 @@ seatunnel.source.Maxcompute = connector-maxcompute
 seatunnel.sink.Maxcompute = connector-maxcompute
 seatunnel.source.MySQL-CDC = connector-cdc-mysql
 seatunnel.sink.S3Redshift = connector-s3-redshift
+seatunnel.source.TDengine = connector-tdengine
+seatunnel.sink.TDengine = connector-tdengine
 seatunnel.source.Persistiq = connector-http-persistiq
+
diff --git a/seatunnel-connectors-v2/connector-tdengine/pom.xml b/seatunnel-connectors-v2/connector-tdengine/pom.xml
new file mode 100644
index 000000000..31db6a553
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-connectors-v2</artifactId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>connector-tdengine</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>connector-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.taosdata.jdbc</groupId>
+      <artifactId>taos-jdbcdriver</artifactId>
+      <version>3.0.3</version>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
new file mode 100644
index 000000000..4bd7e1c60
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TDengineSourceConfig implements Serializable {
+
+    /**
+     * jdbc:TAOS-RS://localhost:6041/
+     */
+    private String url;
+    private String username;
+    private String password;
+    private String database;
+    private String stable;
+    //param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side, other than the JDBC client side
+    //so this param represent the server-side timezone setting up
+    private String timezone;
+    private String lowerBound;
+    private String upperBound;
+    private List<String> fields;
+    private List<String> tags;
+
+    public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
+        TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
+        tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null);
+        tdengineSourceConfig.setDatabase(pluginConfig.hasPath(DATABASE) ? pluginConfig.getString(DATABASE) : null);
+        tdengineSourceConfig.setStable(pluginConfig.hasPath(STABLE) ? pluginConfig.getString(STABLE) : null);
+        tdengineSourceConfig.setUsername(pluginConfig.hasPath(USERNAME) ? pluginConfig.getString(USERNAME) : null);
+        tdengineSourceConfig.setPassword(pluginConfig.hasPath(PASSWORD) ? pluginConfig.getString(PASSWORD) : null);
+        tdengineSourceConfig.setUpperBound(pluginConfig.hasPath(UPPER_BOUND) ? pluginConfig.getString(UPPER_BOUND) : null);
+        tdengineSourceConfig.setLowerBound(pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null);
+        tdengineSourceConfig.setTimezone(pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC");
+        return tdengineSourceConfig;
+    }
+
+    public static class ConfigNames {
+
+        public static String URL = "url";
+        public static String USERNAME = "username";
+        public static String PASSWORD = "password";
+        public static String DATABASE = "database";
+        public static String STABLE = "stable";
+        public static String TIMEZONE = "timezone";
+        public static String LOWER_BOUND = "lower_bound";
+        public static String UPPER_BOUND = "upper_bound";
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java
new file mode 100644
index 000000000..77e1edac8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class TDengineConnectorException extends SeaTunnelRuntimeException {
+    public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
new file mode 100644
index 000000000..a06cd8999
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private SeaTunnelRowType seaTunnelRowType;
+    private Config pluginConfig;
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
+        return new TDengineSinkWriter(pluginConfig, seaTunnelRowType);
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public String getPluginName() {
+        return "TDengine";
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
new file mode 100644
index 000000000..e6c0b5785
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+@Slf4j
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        conn = DriverManager.getConnection(jdbcUrl);
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            while (metaResultSet.next()) {
+                if (StringUtils.equals("TAG", metaResultSet.getString("note"))) {
+                    tagsNum++;
+                }
+            }
+        }
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+            String sql = String.format("INSERT INTO %s using %s tags ( %s ) VALUES ( %s );",
+                element.getField(0),
+                config.getStable(),
+                tagValues,
+                StringUtils.join(convertDataType(metrics), ","));
+            final int rowCount = statement.executeUpdate(sql);
+            if (rowCount == 0) {
+                Throwables.propagateIfPossible(new TDengineConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "insert error:" + element));
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (Objects.nonNull(conn)) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                throw new TDengineConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, "TDengine writer connection close failed", e);
+            }
+        }
+    }
+
+    private Object[] convertDataType(Object[] objects) {
+        return Arrays.stream(objects)
+            .map(object -> {
+                if (LocalDateTime.class.equals(object.getClass())) {
+                    //transform timezone according to the config
+                    return "'" + ((LocalDateTime) object).atZone(ZoneId.systemDefault()).withZoneSameInstant(ZoneId.of(config.getTimezone())).format(FORMATTER) + "'";
+                } else if (String.class.equals(object.getClass())) {
+                    return "'" + object + "'";
+                }
+                return object;
+            })
+            .toArray();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
new file mode 100644
index 000000000..a3d393d69
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.buildSourceConfig;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceReader.Context;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.List;
+
+/**
+ * TDengine source each split corresponds one subtable
+ * <p>
+ * TODO: wait for optimization
+ * 1. batch -> batch + stream
+ * 2. one item of data writing -> a batch of data writing
+ */
+@AutoService(SeaTunnelSource.class)
+public class TDengineSource implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, TDengineSourceState> {
+
+    private SeaTunnelRowType seaTunnelRowType;
+    private TDengineSourceConfig tdengineSourceConfig;
+
+    @Override
+    public String getPluginName() {
+        return "TDengine";
+    }
+
+    @SneakyThrows
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, DATABASE, STABLE, USERNAME, PASSWORD);
+        if (!result.isSuccess()) {
+            throw new TDengineConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "TDengine connection require url/database/stable/username/password. All of these must not be empty.");
+        }
+        tdengineSourceConfig = buildSourceConfig(pluginConfig);
+
+        //add subtable_name and tags to `seaTunnelRowType`
+        SeaTunnelRowType originRowType = getSTableMetaInfo(tdengineSourceConfig);
+        seaTunnelRowType = addHiddenAttribute(originRowType);
+    }
+
+    @SneakyThrows
+    private SeaTunnelRowType getSTableMetaInfo(TDengineSourceConfig config) {
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Connection conn = DriverManager.getConnection(jdbcUrl);
+        List<String> fieldNames = Lists.newArrayList();
+        List<SeaTunnelDataType<?>> fieldTypes = Lists.newArrayList();
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            while (metaResultSet.next()) {
+                fieldNames.add(metaResultSet.getString(1));
+                fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
+            }
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new String[0]), fieldTypes.toArray(new SeaTunnelDataType<?>[0]));
+    }
+
+    private SeaTunnelRowType addHiddenAttribute(SeaTunnelRowType originRowType) {
+        //0-subtable_name / 1-n field_names /
+        String[] fieldNames = ArrayUtils.add(originRowType.getFieldNames(), 0, "subtable_name");
+        // n+1-> tags
+        SeaTunnelDataType<?>[] fieldTypes = ArrayUtils.add(originRowType.getFieldTypes(), 0, BasicType.STRING_TYPE);
+        return new SeaTunnelRowType(fieldNames, fieldTypes);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return seaTunnelRowType;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, TDengineSourceSplit> createReader(Context readerContext) {
+        return new TDengineSourceReader(tdengineSourceConfig, readerContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> createEnumerator(SourceSplitEnumerator.Context<TDengineSourceSplit> enumeratorContext) {
+        return new TDengineSourceSplitEnumerator(seaTunnelRowType, tdengineSourceConfig, enumeratorContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> restoreEnumerator(SourceSplitEnumerator.Context<TDengineSourceSplit> enumeratorContext,
+        TDengineSourceState checkpointState) {
+        return new TDengineSourceSplitEnumerator(seaTunnelRowType, tdengineSourceConfig, checkpointState, enumeratorContext);
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
new file mode 100644
index 000000000..1795b794e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        synchronized (collector.getCheckpointLock()) {
+            sourceSplits.forEach(split -> {
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine split read error", e);
+                }
+            });
+        }
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded TDengine source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public void open(){
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        //todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
+        // there is a exception : Caused by: java.sql.SQLException: can't create connection with server
+        // under docker network env
+        // @bobo (tdengine)
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false");
+        try {
+            conn = DriverManager.getConnection(jdbcUrl, connProps);
+        } catch (SQLException e) {
+            throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "get TDengine connection failed:" + jdbcUrl);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (!Objects.isNull(conn)) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine reader connection close failed", e);
+        }
+    }
+
+    private void read(TDengineSourceSplit split, Collector<SeaTunnelRow> output) throws Exception {
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet resultSet = statement.executeQuery(split.getQuery());
+            ResultSetMetaData meta = resultSet.getMetaData();
+
+            while (resultSet.next()) {
+                Object[] datas = new Object[meta.getColumnCount() + 1];
+                datas[0] = split.splitId();
+                for (int i = 1; i <= meta.getColumnCount(); i++) {
+                    datas[i] = convertDataType(resultSet.getObject(i));
+                }
+                output.collect(new SeaTunnelRow(datas));
+            }
+        }
+    }
+
+    private Object convertDataType(Object object) {
+        if (Timestamp.class.equals(object.getClass())) {
+            return ((Timestamp) object).toLocalDateTime();
+        } else if (byte[].class.equals(object.getClass())) {
+            return new String((byte[]) object);
+        }
+        return object;
+    }
+
+    @Override
+    public List<TDengineSourceSplit> snapshotState(long checkpointId) {
+        return new ArrayList<>(sourceSplits);
+    }
+
+    @Override
+    public void addSplits(List<TDengineSourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        // do nothing
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        // do nothing
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java
new file mode 100644
index 000000000..2b8ad47f8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class TDengineSourceSplit implements SourceSplit {
+
+    private static final long serialVersionUID = -1L;
+
+    private String splitId;
+
+    /**
+     * final query statement
+     */
+    private String query;
+
+    @Override
+    public String splitId() {
+        return splitId;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public TDengineSourceSplit(String splitId, String query) {
+        this.splitId = splitId;
+        this.query = query;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
new file mode 100644
index 000000000..2a8d4656a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+
+import com.google.common.collect.Sets;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
+
+    private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
+    private final TDengineSourceConfig config;
+    private Set<TDengineSourceSplit> pendingSplit = new HashSet<>();
+    private Set<TDengineSourceSplit> assignedSplit = new HashSet<>();
+    private Connection conn;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this(seaTunnelRowType, config, null, context);
+    }
+
+    public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this.config = config;
+        this.context = context;
+        this.seaTunnelRowType = seaTunnelRowType;
+        if (sourceState != null) {
+            this.assignedSplit = sourceState.getAssignedSplit();
+        }
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    @SneakyThrows
+    @Override
+    public void open() {
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        conn = DriverManager.getConnection(jdbcUrl);
+    }
+
+    @Override
+    public void run() throws SQLException {
+        pendingSplit = getAllSplits();
+        assignSplit(context.registeredReaders());
+    }
+
+    /*
+     * 1. get timestampField
+     * 2. get all sub tables of configured super table
+     * 3. each split has one sub table
+     */
+    private Set<TDengineSourceSplit> getAllSplits() throws SQLException {
+        final String timestampFieldName;
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet fieldNameResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            fieldNameResultSet.next();
+            timestampFieldName = fieldNameResultSet.getString(1);
+        }
+
+        final Set<TDengineSourceSplit> splits = Sets.newHashSet();
+        try (Statement statement = conn.createStatement()) {
+            String metaSQL = "select table_name from information_schema.ins_tables where db_name = '" + config.getDatabase() + "' and stable_name='" + config.getStable() + "';";
+            ResultSet subTableNameResultSet = statement.executeQuery(metaSQL);
+            while (subTableNameResultSet.next()) {
+                final String subTableName = subTableNameResultSet.getString(1);
+                final TDengineSourceSplit splitBySubTable = createSplitBySubTable(subTableName, timestampFieldName);
+                splits.add(splitBySubTable);
+            }
+        }
+        return splits;
+    }
+
+    private TDengineSourceSplit createSplitBySubTable(String subTableName, String timestampFieldName) {
+        String selectFields = Arrays.stream(seaTunnelRowType.getFieldNames()).skip(1).collect(Collectors.joining(","));
+        String subTableSQL = "select " + selectFields + " from " + config.getDatabase() + "." + subTableName;
+        String start = config.getLowerBound();
+        String end = config.getUpperBound();
+        if (start != null || end != null) {
+            String startCondition = null;
+            String endCondition = null;
+            //Left closed right away
+            if (start != null) {
+                startCondition = timestampFieldName + " >= '" + start + "'";
+            }
+            if (end != null) {
+                endCondition = timestampFieldName + " < '" + end + "'";
+            }
+            String query = StringUtils.join(new String[]{startCondition, endCondition}, " and ");
+            subTableSQL = subTableSQL + " where " + query;
+        }
+
+        return new TDengineSourceSplit(subTableName, subTableSQL);
+    }
+
+    @Override
+    public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> taskIDList) {
+        assignedSplit = pendingSplit.stream()
+                .map(split -> {
+                    int splitOwner = getSplitOwner(split.splitId(), context.currentParallelism());
+                    if (taskIDList.contains(splitOwner)) {
+                        context.assignSplit(splitOwner, split);
+                        return split;
+                    } else {
+                        return null;
+                    }
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        pendingSplit.clear();
+    }
+
+    @Override
+    public TDengineSourceState snapshotState(long checkpointId) {
+        return new TDengineSourceState(assignedSplit);
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        //nothing to do
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId);
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (!Objects.isNull(conn)) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            throw new TDengineConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "TDengine split_enumerator connection close failed", e);
+        }
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        //nothing to do
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
new file mode 100644
index 000000000..fc839682a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.state;
+
+import org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class TDengineSourceState implements Serializable {
+
+    private final Set<TDengineSourceSplit> assignedSplit;
+
+    public TDengineSourceState(Set<TDengineSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
+    }
+
+    public Set<TDengineSourceSplit> getAssignedSplit() {
+        return assignedSplit;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java
new file mode 100644
index 000000000..35e50c670
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TDengineTypeMapper {
+
+
+    // ============================data types=====================
+
+    private static final String TDENGINE_UNKNOWN = "UNKNOWN";
+    private static final String TDENGINE_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String TDENGINE_TINYINT = "TINYINT";
+    private static final String TDENGINE_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String TDENGINE_SMALLINT = "SMALLINT";
+    private static final String TDENGINE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String TDENGINE_MEDIUMINT = "MEDIUMINT";
+    private static final String TDENGINE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String TDENGINE_INT = "INT";
+    private static final String TDENGINE_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String TDENGINE_INTEGER = "INTEGER";
+    private static final String TDENGINE_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String TDENGINE_BIGINT = "BIGINT";
+    private static final String TDENGINE_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String TDENGINE_DECIMAL = "DECIMAL";
+    private static final String TDENGINE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String TDENGINE_FLOAT = "FLOAT";
+    private static final String TDENGINE_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String TDENGINE_DOUBLE = "DOUBLE";
+    private static final String TDENGINE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String TDENGINE_CHAR = "CHAR";
+    private static final String TDENGINE_VARCHAR = "VARCHAR";
+    private static final String TDENGINE_TINYTEXT = "TINYTEXT";
+    private static final String TDENGINE_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String TDENGINE_TEXT = "TEXT";
+    private static final String TDENGINE_LONGTEXT = "LONGTEXT";
+    private static final String TDENGINE_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String TDENGINE_DATE = "DATE";
+    private static final String TDENGINE_DATETIME = "DATETIME";
+    private static final String TDENGINE_TIME = "TIME";
+    private static final String TDENGINE_TIMESTAMP = "TIMESTAMP";
+    private static final String TDENGINE_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String TDENGINE_TINYBLOB = "TINYBLOB";
+    private static final String TDENGINE_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String TDENGINE_BLOB = "BLOB";
+    private static final String TDENGINE_LONGBLOB = "LONGBLOB";
+    private static final String TDENGINE_BINARY = "BINARY";
+    private static final String TDENGINE_VARBINARY = "VARBINARY";
+    private static final String TDENGINE_GEOMETRY = "GEOMETRY";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static SeaTunnelDataType<?> mapping(String tdengineType) {
+        switch (tdengineType) {
+            case TDENGINE_BIT:
+                return BasicType.BOOLEAN_TYPE;
+            case TDENGINE_TINYINT:
+            case TDENGINE_TINYINT_UNSIGNED:
+            case TDENGINE_SMALLINT:
+            case TDENGINE_SMALLINT_UNSIGNED:
+            case TDENGINE_MEDIUMINT:
+            case TDENGINE_MEDIUMINT_UNSIGNED:
+            case TDENGINE_INT:
+            case TDENGINE_INTEGER:
+            case TDENGINE_YEAR:
+                return BasicType.INT_TYPE;
+            case TDENGINE_INT_UNSIGNED:
+            case TDENGINE_INTEGER_UNSIGNED:
+            case TDENGINE_BIGINT:
+                return BasicType.LONG_TYPE;
+            case TDENGINE_BIGINT_UNSIGNED:
+                return new DecimalType(20, 0);
+            case TDENGINE_DECIMAL:
+                log.warn("{} will probably cause value overflow.", TDENGINE_DECIMAL);
+                return new DecimalType(38, 18);
+            case TDENGINE_DECIMAL_UNSIGNED:
+                return new DecimalType(38, 18);
+            case TDENGINE_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case TDENGINE_FLOAT_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", TDENGINE_FLOAT_UNSIGNED);
+                return BasicType.FLOAT_TYPE;
+            case TDENGINE_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case TDENGINE_DOUBLE_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", TDENGINE_DOUBLE_UNSIGNED);
+                return BasicType.DOUBLE_TYPE;
+            case TDENGINE_CHAR:
+            case TDENGINE_TINYTEXT:
+            case TDENGINE_MEDIUMTEXT:
+            case TDENGINE_TEXT:
+            case TDENGINE_VARCHAR:
+            case TDENGINE_JSON:
+            case TDENGINE_LONGTEXT:
+                return BasicType.STRING_TYPE;
+            case TDENGINE_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TDENGINE_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case TDENGINE_DATETIME:
+            case TDENGINE_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+            case TDENGINE_TINYBLOB:
+            case TDENGINE_MEDIUMBLOB:
+            case TDENGINE_BLOB:
+            case TDENGINE_LONGBLOB:
+            case TDENGINE_VARBINARY:
+            case TDENGINE_BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+
+            //Doesn't support yet
+            case TDENGINE_GEOMETRY:
+            case TDENGINE_UNKNOWN:
+            default:
+                throw new TDengineConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, String.format(
+                        "Doesn't support TDENGINE type '%s' on column '%s'  yet.",
+                        tdengineType));
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java
new file mode 100644
index 000000000..118446193
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/TDengineTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine;
+
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+public class TDengineTest {
+
+    public void testQueryUrl(String jdbcUrl) {
+        Assertions.assertDoesNotThrow(() -> {
+            try (Connection conn = getConnection(jdbcUrl)) {
+                try (Statement stmt = conn.createStatement()) {
+                    ResultSet rs = stmt.executeQuery("SELECT location,AVG(voltage) FROM meters GROUP BY location;");
+                }
+            }
+        });
+    }
+
+    @SneakyThrows
+    private Connection getConnection(String jdbcUrl) {
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        return DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index ff56c817b..bdae3668b 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -68,6 +68,7 @@
         <module>connector-openmldb</module>
         <module>connector-doris</module>
         <module>connector-maxcompute</module>
+        <module>connector-tdengine</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index af2ad8ac0..f887959b7 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -443,6 +443,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-tdengine</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
 
                 <!-- jdbc driver -->
                 <dependency>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml
new file mode 100644
index 000000000..974a96be0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-tdengine-e2e</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+    <dependencies>
+        <!-- SeaTunnel connectors -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-tdengine</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
new file mode 100644
index 000000000..19ef53524
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.tdengine;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class TDengineIT extends TestSuiteBase implements TestResource {
+    private static final String DOCKER_IMAGE = "tdengine/tdengine:3.0.2.1";
+    private static final String NETWORK_ALIASES1 = "flink_e2e_tdengine_src";
+    private static final String NETWORK_ALIASES2 = "flink_e2e_tdengine_sink";
+    private static final int PORT = 6041;
+
+    private GenericContainer<?> tdengineServer1;
+    private GenericContainer<?> tdengineServer2;
+    private Connection connection1;
+    private Connection connection2;
+    private int testDataCount;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        tdengineServer1 = new GenericContainer<>(DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(NETWORK_ALIASES1)
+                .withExposedPorts(PORT)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)))
+                .waitingFor(new HostPortWaitStrategy()
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        tdengineServer2 = new GenericContainer<>(DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(NETWORK_ALIASES2)
+                .withExposedPorts(PORT)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)))
+                .waitingFor(new HostPortWaitStrategy()
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        Startables.deepStart(Stream.of(tdengineServer1)).join();
+        Startables.deepStart(Stream.of(tdengineServer2)).join();
+        log.info("TDengine container started");
+        connection1 = createConnect(tdengineServer1);
+        connection2 = createConnect(tdengineServer2);
+        // wait for TDengine fully start
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .atMost(120, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assertions.assertEquals(Boolean.TRUE, connection1.isValid(100) & connection2.isValid(100)));
+        testDataCount = generateTestDataSet();
+        log.info("tdengine testDataCount=" + testDataCount); // rowCount=8
+    }
+
+    @SneakyThrows
+    private int generateTestDataSet() {
+        int rowCount;
+        try (Statement stmt = connection1.createStatement()) {
+            stmt.execute("CREATE DATABASE power KEEP 3650");
+            stmt.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
+                    "TAGS (location BINARY(64), groupId INT)");
+            String sql = getSQL();
+            rowCount = stmt.executeUpdate(sql);
+
+        }
+        try (Statement stmt = connection2.createStatement()) {
+            stmt.execute("CREATE DATABASE power2 KEEP 3650");
+            stmt.execute("CREATE STABLE power2.meters2 (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
+                    "TAGS (location BINARY(64), groupId INT)");
+        }
+        return rowCount;
+    }
+
+    @TestTemplate
+    public void testTDengine(TestContainer container) throws Exception {
+        Container.ExecResult execResult = container.executeJob("/tdengine/tdengine_source_to_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        long rowCountInserted = readSinkDataset();
+        Assertions.assertEquals(rowCountInserted, testDataCount);
+    }
+
+    @SneakyThrows
+    private long readSinkDataset() {
+        long rowCount;
+        try (Statement stmt = connection2.createStatement()) {
+            ResultSet resultSet = stmt.executeQuery("select count(1) from power2.meters2;");
+            resultSet.next();
+            rowCount = resultSet.getLong(1);
+        }
+        return rowCount;
+    }
+
+    @SneakyThrows
+    private Connection createConnect(GenericContainer<?> tdengineServer) {
+        String jdbcUrl = "jdbc:TAOS-RS://" + tdengineServer.getHost() + ":" + tdengineServer.getFirstMappedPort() + "?user=root&password=taosdata";
+        Connection conn = DriverManager.getConnection(jdbcUrl);
+        log.info("TDengine Connected! " + jdbcUrl);
+        return conn;
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (connection1 != null) {
+            connection1.close();
+        }
+        if (connection2 != null) {
+            connection2.close();
+        }
+        if (tdengineServer1 != null) {
+            tdengineServer1.stop();
+        }
+        if (tdengineServer2 != null) {
+            tdengineServer2.stop();
+        }
+    }
+
+    /**
+     * The generated SQL is:
+     * INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)
+     * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000)
+     * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000)
+     * power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000)
+     * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000)
+     * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000)
+     * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000)
+     * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000)
+     */
+    private static String getSQL() {
+        StringBuilder sb = new StringBuilder("INSERT INTO ");
+        for (String line : getRawData()) {
+            String[] ps = line.split(",");
+            sb.append("power." + ps[0]).append(" USING power.meters TAGS(")
+                    .append(ps[5]).append(", ") // tag: location
+                    .append(ps[6]) // tag: groupId
+                    .append(") VALUES(")
+                    .append('\'').append(ps[1]).append('\'').append(",") // ts
+                    .append(ps[2]).append(",") // current
+                    .append(ps[3]).append(",") // voltage
+                    .append(ps[4]).append(") "); // phase
+        }
+        return sb.toString();
+    }
+
+    private static List<String> getRawData() {
+        return Arrays.asList(
+                "d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2",
+                "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2",
+                "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2",
+                "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3",
+                "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2",
+                "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2",
+                "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3",
+                "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3"
+        );
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
new file mode 100644
index 000000000..803a10f83
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 2
+  job.mode = "STREAMING"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    TDengine {
+      url : "jdbc:TAOS-RS://flink_e2e_tdengine_src:6041/"
+      username : "root"
+      password : "taosdata"
+      database : "power"
+      stable : "meters"
+      lower_bound : "2018-10-03 14:38:05.000"
+      upper_bound : "2018-10-03 14:38:16.801"
+      result_table_name = "tdengine_result"
+    }
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+}
+
+sink {
+    TDengine {
+        url : "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/"
+        username : "root"
+        password : "taosdata"
+        database : "power2"
+        stable : "meters2"
+        timezone : "UTC"
+      }
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 466e02041..457217735 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -45,6 +45,7 @@
         <module>connector-cdc-mysql-e2e</module>
         <module>connector-iceberg-e2e</module>
         <module>connector-iceberg-hadoop3-e2e</module>
+        <module>connector-tdengine-e2e</module>
         <module>connector-datahub-e2e</module>
     </modules>
 
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
index 6fa2aa94a..353ada932 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
@@ -123,7 +123,9 @@ public class ParallelBatchPartitionReader {
     public void close() throws IOException {
         running = false;
         try {
-            internalSource.close();
+            if (internalSource != null) {
+                internalSource.close();
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }