You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/26 08:03:55 UTC

[GitHub] [incubator-seatunnel] lhyundeadsoul opened a new pull request, #2832: [Feature][Connector-V2] add tdengine source

lhyundeadsoul opened a new pull request, #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   refer to https://github.com/apache/incubator-seatunnel/issues/2671
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1059413084


##########
docs/en/connector-v2/sink/TDengine.md:
##########
@@ -0,0 +1,104 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine.
+
+## Key features
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| fields                   | config  | no       | -             |
+| tags                   | config  | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### fields [string]
+
+the fields of the TDengine stable
+
+e.g.
+
+```hocon
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```

Review Comment:
   This config file maybe has some problems, `fields` option does not work in it. I agree with @CalvinKirs , now all connectors unified the config of schema, you can refer to https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java#L75 study how use `SeaTunnelSchema` get SeaTunnelRowType.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1061371562


##########
docs/en/connector-v2/sink/TDengine.md:
##########
@@ -0,0 +1,104 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine.
+
+## Key features
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| fields                   | config  | no       | -             |
+| tags                   | config  | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### fields [string]
+
+the fields of the TDengine stable
+
+e.g.
+
+```hocon
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```

Review Comment:
   Thank you for your advice. 
   TDengine is a structured database so I can get the data schema using TDENGINE API without schema config. Therefore, I remove the schema config now.
   
   Please give me a CR again. Thx!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r994225504


##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| partitions_num                 | int     | no       | -             |
+| fields                   | config  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |

Review Comment:
   fix style



##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| partitions_num                 | int     | no       | -             |
+| fields                   | config  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### fields [string]
+
+the fields of the TDengine when you select
+
+the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### lower_bound [long]
+
+the lower_bound of the migration period
+
+### upper_bound [long]
+
+the upper_bound of the migration period
+
+### partitions_num [int]
+
+the partitions_num of the migration data
+
+
+```
+     split the time range into partitions_num parts
+     if numPartitions is 1, use the whole time range
+     if numPartitions < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
+     
+     eg: lower_bound = 1, upper_bound = 10, partitions_num = 2
+     sql = "select * from test"
+     
+     split result
+
+     split 1: select * from test  where (ts >= 1 and ts < 6) 
+     
+     split 2: select * from test  where (ts >= 6 and ts < 11)
+
+```
+

Review Comment:
   How do users use it?



##########
seatunnel-connectors-v2-dist/pom.xml:
##########
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   merge dev branch code, this module removed in dev branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1003940994


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {
+            String sql = "INSERT INTO " + element.getField(0)

Review Comment:
   Because `write` method will called by every row. So I suggest we use resource saving and more efficient way to process data. Should you use `StringBuffer` to connect string?



##########
seatunnel-connectors-v2-dist/pom.xml:
##########
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   We don't need this file anymore. As an alternative, you need to add this connector to `seatunnel-dist/pom.xml`.



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {
+            String sql = "INSERT INTO " + element.getField(0)
+                + " using " + config.getStable()
+                + " tags ( "
+                + tagValues
+                + " ) VALUES ("
+                + StringUtils.join(convertDataType(metrics), ",")
+                + ");";
+            final int rowCount = statement.executeUpdate(sql);

Review Comment:
   Can you use `batch flush` for jdbc?



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {
+            String sql = "INSERT INTO " + element.getField(0)
+                + " using " + config.getStable()
+                + " tags ( "
+                + tagValues
+                + " ) VALUES ("
+                + StringUtils.join(convertDataType(metrics), ",")
+                + ");";
+            final int rowCount = statement.executeUpdate(sql);
+            if (rowCount == 0) {
+                Throwables.propagateIfPossible(new RuntimeException("insert error:" + element));
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (Objects.nonNull(conn)) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    private Object[] convertDataType(Object[] objects) {
+        return Arrays.stream(objects)
+            .map(object -> {
+                if (LocalDateTime.class.equals(object.getClass())) {
+                    return "'" + ((LocalDateTime) object).format(FORMATTER) + "'";

Review Comment:
   same as the suggestion in `write` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1005316433


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {

Review Comment:
   Very good advice. Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1367159628

   @EricJoy2048 @hailin0 @Hisoka-X @TyrantLucifer @CalvinKirs 
   Hey guys, please help to rerun the CI  and give me a CR if you are free.   Thx!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r994224378


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+
+import com.google.common.collect.Sets;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
+    private final TDengineSourceConfig config;
+    private Set<TDengineSourceSplit> pendingSplit = Sets.newConcurrentHashSet();
+    private Set<TDengineSourceSplit> assignedSplit = Sets.newConcurrentHashSet();
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this.config = config;
+        this.context = context;
+    }
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this(config, context);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return tp.hashCode() % numReaders;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void run() {
+        pendingSplit = getAllSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    /**
+     * split the time range into numPartitions parts if numPartitions is 1, use the whole time range if numPartitions < (end - start), use (start-end) partitions
+     * <p>
+     * eg: start = 1, end = 10, numPartitions = 2 sql = "select * from test"
+     * <p>
+     * split result
+     * <p>
+     * split 1: select * from test  where (time >= 1 and time < 6)
+     * <p>
+     * split 2: select * from test  where (time >= 6 and time < 11)
+     */
+    private Set<TDengineSourceSplit> getAllSplit() {
+        String sql = "select * from " + config.getStable();
+        Set<TDengineSourceSplit> sourceSplits = Sets.newHashSet();
+        // no need numPartitions, use one partition
+        if (config.getPartitionsNum() <= 1) {
+            sourceSplits.add(new TDengineSourceSplit("0", sql));
+            return sourceSplits;
+        }
+        long start = config.getLowerBound();
+        long end = config.getUpperBound();
+        int partitionsNum = config.getPartitionsNum();
+        if (end - start < partitionsNum) {
+            partitionsNum = (int) (end - start);
+        }
+        int size = (int) (end - start) / partitionsNum;
+        long currentStart = start;
+        int i = 0;
+        while (i < partitionsNum) {
+            //Left closed right away
+            long currentEnd = i == partitionsNum - 1 ? end + 1 : currentStart + size;
+            String query = " where ts >= '" + Instant.ofEpochMilli(currentStart).atZone(ZoneId.systemDefault()).format(FORMATTER) + "' and ts < '" + Instant.ofEpochMilli(currentEnd).atZone(ZoneId.systemDefault()).format(FORMATTER) + "'";
+            String finalSQL = sql + query;
+            sourceSplits.add(new TDengineSourceSplit(String.valueOf(i + System.nanoTime()), finalSQL));
+            i++;
+            currentStart += size;
+        }
+        return sourceSplits;
+    }
+
+    @Override
+    public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> taskIDList) {

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1061371562


##########
docs/en/connector-v2/sink/TDengine.md:
##########
@@ -0,0 +1,104 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine.
+
+## Key features
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| fields                   | config  | no       | -             |
+| tags                   | config  | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### fields [string]
+
+the fields of the TDengine stable
+
+e.g.
+
+```hocon
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```

Review Comment:
   Thank you for your advice. 
   TDengine is a structured database so I can get the data schema using TDENGINE API without schema config. Therefore, I remove the schema config now.
   
   Please give me a CR again. Thx! @TyrantLucifer @CalvinKirs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1059291557


##########
docs/en/connector-v2/sink/TDengine.md:
##########
@@ -0,0 +1,104 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine.
+
+## Key features
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| fields                   | config  | no       | -             |
+| tags                   | config  | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### fields [string]
+
+the fields of the TDengine stable
+
+e.g.
+
+```hocon
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```

Review Comment:
   could you use `schema`?We uniformly use this field to describe its schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1367794497

   I can't find the previous context, why can't we implement this in `jdbc-connector`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1060246529


##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -18,8 +18,8 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

Review Comment:
   revert this file change



##########
seatunnel-connectors-v2-dist/pom.xml:
##########
@@ -0,0 +1,198 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   remove this module



##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java:
##########
@@ -37,5 +37,5 @@ public enum SqlType {
     DATE,
     TIME,
     TIMESTAMP,
-    ROW;
+    ROW

Review Comment:
   revert this file change



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java:
##########
@@ -74,6 +75,7 @@
 
 import scala.Tuple2;
 
+@Disabled("Temporary fast fix, reason1: Transactions are not supported. reason2: Invalid boolean value, should be true or false controlled by setting bool_true_representation and bool_false_representation")

Review Comment:
   revert this file change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1007564346


##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -18,8 +18,8 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

Review Comment:
   revert this change
   
   Do not submit example module
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1003068815


##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine

Review Comment:
   sink is the same as source param, and I provided a sink demo already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063221965


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+@Slf4j
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        conn = DriverManager.getConnection(jdbcUrl);
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            while (metaResultSet.next()) {
+                if (StringUtils.equals("TAG", metaResultSet.getString("note"))) {
+                    tagsNum++;
+                }
+            }
+        }
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+            String sql = "INSERT INTO " + element.getField(0)

Review Comment:
   There has always been a controversy about the question of which one is better, concatenation or format, referring to
   https://stackoverflow.com/questions/925423/is-it-better-practice-to-use-string-format-over-string-concatenation-in-java
   
   `format` is much slower but easily localized, on the contrary, `concatenation` is faster and has been optimized by the compiler. (you'll see that the compiler converts "+" to StringBuilder automatically only if you are not in a loop.)
   
   but It's OK here. I will change it to `format`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063231474


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+@Slf4j
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        conn = DriverManager.getConnection(jdbcUrl);
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            while (metaResultSet.next()) {
+                if (StringUtils.equals("TAG", metaResultSet.getString("note"))) {
+                    tagsNum++;
+                }
+            }
+        }
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+            String sql = "INSERT INTO " + element.getField(0)

Review Comment:
   Thank you for your reply. IMO, Using format makes your code readable and maintainable, if this operation invoke only once, `format` is a best choice. But in this scenario, this operation more than once and performance is important. So I think as before is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r994228620


##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| partitions_num                 | int     | no       | -             |
+| fields                   | config  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |

Review Comment:
   merge dev branch code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1291650237

   > 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1008706803


##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -18,8 +18,8 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

Review Comment:
   > gitignore file contains `seatunnel-examples ` , i am curious about why doesn't it take effect
   
   The `.gitignore` only ensure the newly added files are not checked, but the existing files are still checked for changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 closed pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 closed pull request #2832: [Feature][Connector-V2] add tdengine source
URL: https://github.com/apache/incubator-seatunnel/pull/2832


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1373062196

   hey guys,  Thank you for all your suggestions. 
   All checks have passed now. 
   Could you help me to merge? @CalvinKirs @hailin0 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1059413084


##########
docs/en/connector-v2/sink/TDengine.md:
##########
@@ -0,0 +1,104 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine.
+
+## Key features
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| fields                   | config  | no       | -             |
+| tags                   | config  | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### fields [string]
+
+the fields of the TDengine stable
+
+e.g.
+
+```hocon
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```

Review Comment:
   This config file maybe has some problems, `fields` option does not work in it. I agree with @CalvinKirs , now all connectors unified the config of schema, you can refer to https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java#L75 study how use `SeaTunnelSchema` get `SeaTunnelRowType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1062140786


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.tdengine;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class TDengineIT extends TestSuiteBase implements TestResource {
+    private static final String DOCKER_IMAGE = "tdengine/tdengine:3.0.2.1";
+    private static final String NETWORK_ALIASES1 = "flink_e2e_tdengine_src";
+    private static final String NETWORK_ALIASES2 = "flink_e2e_tdengine_sink";
+    private static final int PORT = 6041;
+
+    private GenericContainer<?> tdengineServer1;
+    private GenericContainer<?> tdengineServer2;
+    private Connection connection1;
+    private Connection connection2;
+    private int testDataCount;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        tdengineServer1 = new GenericContainer<>(DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(NETWORK_ALIASES1)
+                .withExposedPorts(PORT)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)))
+                .waitingFor(new HostPortWaitStrategy()
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        tdengineServer2 = new GenericContainer<>(DOCKER_IMAGE)

Review Comment:
   Why need to start two nodes? Is it possible to start only one node



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r994228620


##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| partitions_num                 | int     | no       | -             |
+| fields                   | config  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |

Review Comment:
   Please merge dev branch code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul closed pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul closed pull request #2832: [Feature][Connector-V2] add tdengine source
URL: https://github.com/apache/incubator-seatunnel/pull/2832


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1059399907


##########
docs/en/connector-v2/sink/TDengine.md:
##########
@@ -0,0 +1,104 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine.
+
+## Key features
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| fields                   | config  | no       | -             |
+| tags                   | config  | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### fields [string]
+
+the fields of the TDengine stable
+
+e.g.
+
+```hocon
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```

Review Comment:
    I refer to IotDB connector (`seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf`)
   And I think that 'fields' is the default schema name according to `org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema#FIELD_KEY` in `org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema#buildWithConfig`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063938409


##########
seatunnel-connectors-v2/connector-tdengine/pom.xml:
##########
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   Add this module into [here](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml#L232)



##########
seatunnel-connectors-v2/connector-tdengine/pom.xml:
##########
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   add jdbc driver into this file
   
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml#L183



##########
seatunnel-connectors-v2/connector-tdengine/pom.xml:
##########
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   ```xml
                   <dependency>
                       <groupId>org.apache.seatunnel</groupId>
                       <artifactId>connector-tdengine</artifactId>
                       <version>${project.version}</version>
                       <scope>provided</scope>
                   </dependency>
   
   
                  <!-- jdbc driver -->
                   <dependency>
                       <groupId>com.taosdata.jdbc</groupId>
                       <artifactId>taos-jdbcdriver</artifactId>
                       <version>${version}</version>
                       <scope>provided</scope>
                   </dependency>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063943922


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        sourceSplits.forEach(split -> {

Review Comment:
   The implementation in JDBC connector `org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader#pollNext`  is as same as `synchronized (output.getCheckpointLock())`. Where can I refer to using a smaller lock granularity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063940999


##########
seatunnel-connectors-v2/connector-tdengine/pom.xml:
##########
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r982402124


##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| partitions_num                 | int     | no       | -             |
+| fields                   | config  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |

Review Comment:
   check style



##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| partitions_num                 | int     | no       | -             |
+| fields                   | config  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### fields [string]
+
+the fields of the TDengine when you select
+
+the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+        location = "string"
+        groupid = "int"
+      }
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### lower_bound [long]
+
+the lower_bound of the migration period
+
+### upper_bound [long]
+
+the upper_bound of the migration period
+
+### partitions_num [int]
+
+the partitions_num of the migration data
+
+
+```
+     split the time range into partitions_num parts
+     if numPartitions is 1, use the whole time range
+     if numPartitions < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
+     
+     eg: lower_bound = 1, upper_bound = 10, partitions_num = 2
+     sql = "select * from test"
+     
+     split result
+
+     split 1: select * from test  where (ts >= 1 and ts < 6) 
+     
+     split 2: select * from test  where (ts >= 6 and ts < 11)
+
+```
+

Review Comment:
   Add example?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/source/IoTDB.md#examples



##########
seatunnel-connectors-v2-dist/pom.xml:
##########
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   remove this module



##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type    | required | default value |
+|----------------------------|---------|----------|---------------|
+| url                       | string  | yes      | -             |
+| username                       | string     | yes      | -             |
+| password                  | string  | yes      | -             |
+| database                        | string  | yes      |          |
+| stable                     | string  | yes      | -             |
+| partitions_num                 | int     | no       | -             |
+| fields                   | config  | no       | -             |
+| lower_bound                | long    | no       | -             |
+| upper_bound                | long    | no       | -             |
+
+### url [string] 
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### fields [string]
+
+the fields of the TDengine when you select
+
+the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```

Review Comment:
   ```suggestion
   ```hocon
   ```



##########
seatunnel-connectors-v2/connector-tdengine/pom.xml:
##########
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   Add this module into [here](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml#L232)



##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine

Review Comment:
   Add TDengine sink docs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1257958634

   Hi, please solve ci problem and conflict. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1003954814


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {

Review Comment:
   ```suggestion
           try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
   ```
   Use these parameter to save memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1062260170


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.tdengine;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class TDengineIT extends TestSuiteBase implements TestResource {
+    private static final String DOCKER_IMAGE = "tdengine/tdengine:3.0.2.1";
+    private static final String NETWORK_ALIASES1 = "flink_e2e_tdengine_src";
+    private static final String NETWORK_ALIASES2 = "flink_e2e_tdengine_sink";
+    private static final int PORT = 6041;
+
+    private GenericContainer<?> tdengineServer1;
+    private GenericContainer<?> tdengineServer2;
+    private Connection connection1;
+    private Connection connection2;
+    private int testDataCount;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        tdengineServer1 = new GenericContainer<>(DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(NETWORK_ALIASES1)
+                .withExposedPorts(PORT)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)))
+                .waitingFor(new HostPortWaitStrategy()
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        tdengineServer2 = new GenericContainer<>(DOCKER_IMAGE)

Review Comment:
   
   > Why need to start two nodes? Is it possible to start only one node
   
   Sorry, A container can only create one database cuz `supportVnodes `, a TDengine parameter, limits.
   
   Is there any problem with starting two nodes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs merged pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
CalvinKirs merged PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063941421


##########
seatunnel-connectors-v2/connector-tdengine/pom.xml:
##########
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   
   Add this dependency into [here](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml#L232)
   
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml#L447
   
   ```xml
   <!-- jdbc driver -->
                   <dependency>
                       <groupId>com.taosdata.jdbc</groupId>
                       <artifactId>taos-jdbcdriver</artifactId>
                       <version>${version}</version>
                       <scope>provided</scope>
                   </dependency>
   ```



##########
seatunnel-connectors-v2/connector-tdengine/pom.xml:
##########
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   Add this dependency into here
   
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml#L447
   
   ```xml
   <!-- jdbc driver -->
                   <dependency>
                       <groupId>com.taosdata.jdbc</groupId>
                       <artifactId>taos-jdbcdriver</artifactId>
                       <version>${version}</version>
                       <scope>provided</scope>
                   </dependency>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1373219560

   > @EricJoy2048 @hailin0 @Hisoka-X @TyrantLucifer @CalvinKirs Hey guys, please help to give me a CR if you are free. Thx!
   
   Hi @lhyundeadsoul , overall LGTM but a little nit, you should unified the exception in connectors and update user-quick-manaul. You can refer to https://github.com/apache/incubator-seatunnel/issues/2930 see more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1373369130

   > > @EricJoy2048 @hailin0 @Hisoka-X @TyrantLucifer @CalvinKirs Hey guys, please help to give me a CR if you are free. Thx!
   > 
   > Hi @lhyundeadsoul , overall LGTM but a little nit, you should unified the exception in connectors and update user-quick-manaul. You can refer to #2930 see more details.
   
   Exception unification has been done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1007817241


##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -18,8 +18,8 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

Review Comment:
   gitignore file contains `seatunnel-examples ` , i am curious about why doesn't it take effect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1005313434


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {
+            String sql = "INSERT INTO " + element.getField(0)
+                + " using " + config.getStable()
+                + " tags ( "
+                + tagValues
+                + " ) VALUES ("
+                + StringUtils.join(convertDataType(metrics), ",")
+                + ");";
+            final int rowCount = statement.executeUpdate(sql);

Review Comment:
   `org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter#write` method has only one `SeaTunnelRow` param, so I think `batch flush`  will be used  when we have `write(List<SeaTunnelRow>)` api.



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {
+            String sql = "INSERT INTO " + element.getField(0)
+                + " using " + config.getStable()
+                + " tags ( "
+                + tagValues
+                + " ) VALUES ("
+                + StringUtils.join(convertDataType(metrics), ",")
+                + ");";
+            final int rowCount = statement.executeUpdate(sql);

Review Comment:
   `org.apache.seatunnel.connectors.seatunnel.tdengine.sink.TDengineSinkWriter#write` method has only one `SeaTunnelRow` param, so I think `batch flush`  will be used  when we have `write(List<SeaTunnelRow>)` api.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1062141169


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.tdengine;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class TDengineIT extends TestSuiteBase implements TestResource {
+    private static final String DOCKER_IMAGE = "tdengine/tdengine:3.0.2.1";
+    private static final String NETWORK_ALIASES1 = "flink_e2e_tdengine_src";
+    private static final String NETWORK_ALIASES2 = "flink_e2e_tdengine_sink";
+    private static final int PORT = 6041;
+
+    private GenericContainer<?> tdengineServer1;
+    private GenericContainer<?> tdengineServer2;
+    private Connection connection1;
+    private Connection connection2;
+    private int testDataCount;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        tdengineServer1 = new GenericContainer<>(DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(NETWORK_ALIASES1)
+                .withExposedPorts(PORT)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)))
+                .waitingFor(new HostPortWaitStrategy()
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        tdengineServer2 = new GenericContainer<>(DOCKER_IMAGE)

Review Comment:
   @lhyundeadsoul 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1367933624

   > I can't find the previous context, why can't we implement this in `jdbc-connector`?
   
   The reason why we implement tdengine connector separately is that there are some distinctive concepts in TDengine that do not belong to JDBC, such as `subtable`, `tags`, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063191401


##########
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java:
##########
@@ -103,7 +103,7 @@ private static int[] parseDecimalPS(String type) {
         return new int[]{precision, scale};
     }
 
-    private static SeaTunnelDataType<?> parseTypeByString(String type) {
+    public static SeaTunnelDataType<?> parseTypeByString(String type) {

Review Comment:
   Revert?



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+@Slf4j
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        conn = DriverManager.getConnection(jdbcUrl);
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            while (metaResultSet.next()) {
+                if (StringUtils.equals("TAG", metaResultSet.getString("note"))) {
+                    tagsNum++;
+                }
+            }
+        }
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+            String sql = "INSERT INTO " + element.getField(0)

Review Comment:
   String.format is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1064305848


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum TDengineConnectorErrorCode implements SeaTunnelErrorCode {
+    SQL_OPERATION_FAILED("TDengine-01", "execute sql failed"),

Review Comment:
   SQL_OPERATION_FAILED --> CommonErrorCode.SQL_OPERATION_FAILED
   READER_FAILED --> CommonErrorCode.READER_OPERATION_FAILED
   WRITER_FAILED --> CommonErrorCode.WRITER_OPERATION_FAILED
   TYPE_MAPPER_FAILED --> CommonErrorCode.UNSUPPORTED_DATA_TYPE
   
   CONNECTION_FAILED should be removed and fall into writer or reader operation failed.
   
   



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator.Context;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+
+import com.google.common.collect.Sets;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
+
+    private final Context<TDengineSourceSplit> context;
+    private final TDengineSourceConfig config;
+    private Set<TDengineSourceSplit> pendingSplit = new HashSet<>();
+    private Set<TDengineSourceSplit> assignedSplit = new HashSet<>();
+    private Connection conn;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, Context<TDengineSourceSplit> context) {
+        this(seaTunnelRowType, config, null, context);
+    }
+
+    public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, TDengineSourceState sourceState, Context<TDengineSourceSplit> context) {
+        this.config = config;
+        this.context = context;
+        this.seaTunnelRowType = seaTunnelRowType;
+        if (sourceState != null) {
+            this.assignedSplit = sourceState.getAssignedSplit();
+        }
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    @SneakyThrows
+    @Override
+    public void open() {
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        conn = DriverManager.getConnection(jdbcUrl);
+    }
+
+    @Override
+    public void run() throws SQLException {
+        pendingSplit = getAllSplits();
+        assignSplit(context.registeredReaders());
+    }
+
+    /*
+     * 1. get timestampField
+     * 2. get all sub tables of configured super table
+     * 3. each split has one sub table
+     */
+    private Set<TDengineSourceSplit> getAllSplits() throws SQLException {
+        final String timestampFieldName;
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet fieldNameResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            fieldNameResultSet.next();
+            timestampFieldName = fieldNameResultSet.getString(1);
+        }
+
+        final Set<TDengineSourceSplit> splits = Sets.newHashSet();
+        try (Statement statement = conn.createStatement()) {
+            String metaSQL = "select table_name from information_schema.ins_tables where db_name = '" + config.getDatabase() + "' and stable_name='" + config.getStable() + "';";
+            ResultSet subTableNameResultSet = statement.executeQuery(metaSQL);
+            while (subTableNameResultSet.next()) {
+                final String subTableName = subTableNameResultSet.getString(1);
+                final TDengineSourceSplit splitBySubTable = createSplitBySubTable(subTableName, timestampFieldName);
+                splits.add(splitBySubTable);
+            }
+        }
+        return splits;
+    }
+
+    private TDengineSourceSplit createSplitBySubTable(String subTableName, String timestampFieldName) {
+        String selectFields = Arrays.stream(seaTunnelRowType.getFieldNames()).skip(1).collect(Collectors.joining(","));
+        String subTableSQL = "select " + selectFields + " from " + config.getDatabase() + "." + subTableName;
+        String start = config.getLowerBound();
+        String end = config.getUpperBound();
+        if (start != null || end != null) {
+            String startCondition = null;
+            String endCondition = null;
+            //Left closed right away
+            if (start != null) {
+                startCondition = timestampFieldName + " >= '" + start + "'";
+            }
+            if (end != null) {
+                endCondition = timestampFieldName + " < '" + end + "'";
+            }
+            String query = StringUtils.join(new String[]{startCondition, endCondition}, " and ");
+            subTableSQL = subTableSQL + " where " + query;
+        }
+
+        return new TDengineSourceSplit(subTableName, subTableSQL);
+    }
+
+    @Override
+    public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> taskIDList) {
+        assignedSplit = pendingSplit.stream()
+                .map(split -> {
+                    int splitOwner = getSplitOwner(split.splitId(), context.currentParallelism());
+                    if (taskIDList.contains(splitOwner)) {
+                        context.assignSplit(splitOwner, split);
+                        return split;
+                    } else {
+                        return null;
+                    }
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        pendingSplit.clear();
+    }
+
+    @Override
+    public TDengineSourceState snapshotState(long checkpointId) {
+        return new TDengineSourceState(assignedSplit);
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        //nothing to do
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId);
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (!Objects.isNull(conn)) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "TDengine split_enumerator connection close failed", e);

Review Comment:
   As below



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        synchronized (collector.getCheckpointLock()) {
+            sourceSplits.forEach(split -> {
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(TDengineConnectorErrorCode.READER_FAILED, "TDengine split read error", e);
+                }
+            });
+        }
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded TDengine source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public void open(){
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        //todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
+        // there is a exception : Caused by: java.sql.SQLException: can't create connection with server
+        // under docker network env
+        // @bobo (tdengine)
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false");
+        try {
+            conn = DriverManager.getConnection(jdbcUrl, connProps);
+        } catch (SQLException e) {
+            throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "get TDengine connection failed:" + jdbcUrl);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (!Objects.isNull(conn)) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "TDengine reader connection close failed", e);

Review Comment:
   Use `CommonErrorCode.READER_OPERATION_FAILED`



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TDengineTypeMapper {
+
+
+    // ============================data types=====================
+
+    private static final String TDENGINE_UNKNOWN = "UNKNOWN";
+    private static final String TDENGINE_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String TDENGINE_TINYINT = "TINYINT";
+    private static final String TDENGINE_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String TDENGINE_SMALLINT = "SMALLINT";
+    private static final String TDENGINE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String TDENGINE_MEDIUMINT = "MEDIUMINT";
+    private static final String TDENGINE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String TDENGINE_INT = "INT";
+    private static final String TDENGINE_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String TDENGINE_INTEGER = "INTEGER";
+    private static final String TDENGINE_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String TDENGINE_BIGINT = "BIGINT";
+    private static final String TDENGINE_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String TDENGINE_DECIMAL = "DECIMAL";
+    private static final String TDENGINE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String TDENGINE_FLOAT = "FLOAT";
+    private static final String TDENGINE_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String TDENGINE_DOUBLE = "DOUBLE";
+    private static final String TDENGINE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String TDENGINE_CHAR = "CHAR";
+    private static final String TDENGINE_VARCHAR = "VARCHAR";
+    private static final String TDENGINE_TINYTEXT = "TINYTEXT";
+    private static final String TDENGINE_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String TDENGINE_TEXT = "TEXT";
+    private static final String TDENGINE_LONGTEXT = "LONGTEXT";
+    private static final String TDENGINE_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String TDENGINE_DATE = "DATE";
+    private static final String TDENGINE_DATETIME = "DATETIME";
+    private static final String TDENGINE_TIME = "TIME";
+    private static final String TDENGINE_TIMESTAMP = "TIMESTAMP";
+    private static final String TDENGINE_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String TDENGINE_TINYBLOB = "TINYBLOB";
+    private static final String TDENGINE_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String TDENGINE_BLOB = "BLOB";
+    private static final String TDENGINE_LONGBLOB = "LONGBLOB";
+    private static final String TDENGINE_BINARY = "BINARY";
+    private static final String TDENGINE_VARBINARY = "VARBINARY";
+    private static final String TDENGINE_GEOMETRY = "GEOMETRY";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static SeaTunnelDataType<?> mapping(String tdengineType) {
+        switch (tdengineType) {
+            case TDENGINE_BIT:
+                return BasicType.BOOLEAN_TYPE;
+            case TDENGINE_TINYINT:
+            case TDENGINE_TINYINT_UNSIGNED:
+            case TDENGINE_SMALLINT:
+            case TDENGINE_SMALLINT_UNSIGNED:
+            case TDENGINE_MEDIUMINT:
+            case TDENGINE_MEDIUMINT_UNSIGNED:
+            case TDENGINE_INT:
+            case TDENGINE_INTEGER:
+            case TDENGINE_YEAR:
+                return BasicType.INT_TYPE;
+            case TDENGINE_INT_UNSIGNED:
+            case TDENGINE_INTEGER_UNSIGNED:
+            case TDENGINE_BIGINT:
+                return BasicType.LONG_TYPE;
+            case TDENGINE_BIGINT_UNSIGNED:
+                return new DecimalType(20, 0);
+            case TDENGINE_DECIMAL:
+                log.warn("{} will probably cause value overflow.", TDENGINE_DECIMAL);
+                return new DecimalType(38, 18);
+            case TDENGINE_DECIMAL_UNSIGNED:
+                return new DecimalType(38, 18);
+            case TDENGINE_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case TDENGINE_FLOAT_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", TDENGINE_FLOAT_UNSIGNED);
+                return BasicType.FLOAT_TYPE;
+            case TDENGINE_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case TDENGINE_DOUBLE_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", TDENGINE_DOUBLE_UNSIGNED);
+                return BasicType.DOUBLE_TYPE;
+            case TDENGINE_CHAR:
+            case TDENGINE_TINYTEXT:
+            case TDENGINE_MEDIUMTEXT:
+            case TDENGINE_TEXT:
+            case TDENGINE_VARCHAR:
+            case TDENGINE_JSON:
+            case TDENGINE_LONGTEXT:
+                return BasicType.STRING_TYPE;
+            case TDENGINE_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TDENGINE_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case TDENGINE_DATETIME:
+            case TDENGINE_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+            case TDENGINE_TINYBLOB:
+            case TDENGINE_MEDIUMBLOB:
+            case TDENGINE_BLOB:
+            case TDENGINE_LONGBLOB:
+            case TDENGINE_VARBINARY:
+            case TDENGINE_BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+
+            //Doesn't support yet
+            case TDENGINE_GEOMETRY:
+            case TDENGINE_UNKNOWN:
+            default:
+                throw new TDengineConnectorException(TDengineConnectorErrorCode.TYPE_MAPPER_FAILED, String.format(

Review Comment:
   As below, use `CommonErrorCode.UNSUPPORTED_DATA_TYPE`



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        synchronized (collector.getCheckpointLock()) {
+            sourceSplits.forEach(split -> {
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(TDengineConnectorErrorCode.READER_FAILED, "TDengine split read error", e);

Review Comment:
   As below



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.buildSourceConfig;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceReader.Context;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.List;
+
+/**
+ * TDengine source each split corresponds one subtable
+ * <p>
+ * TODO: wait for optimization
+ * 1. batch -> batch + stream
+ * 2. one item of data writing -> a batch of data writing
+ */
+@AutoService(SeaTunnelSource.class)
+public class TDengineSource implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, TDengineSourceState> {
+
+    private SeaTunnelRowType seaTunnelRowType;
+    private TDengineSourceConfig tdengineSourceConfig;
+
+    @Override
+    public String getPluginName() {
+        return "TDengine";
+    }
+
+    @SneakyThrows
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, DATABASE, STABLE, USERNAME, PASSWORD);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "TDengine connection require url/database/stable/username/password. All of these must not be empty.");

Review Comment:
   Use `SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED`



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+@Slf4j
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        conn = DriverManager.getConnection(jdbcUrl);
+        try (Statement statement = conn.createStatement()) {
+            final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+            while (metaResultSet.next()) {
+                if (StringUtils.equals("TAG", metaResultSet.getString("note"))) {
+                    tagsNum++;
+                }
+            }
+        }
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+            String sql = String.format("INSERT INTO %s using %s tags ( %s ) VALUES ( %s );",
+                element.getField(0),
+                config.getStable(),
+                tagValues,
+                StringUtils.join(convertDataType(metrics), ","));
+            final int rowCount = statement.executeUpdate(sql);
+            if (rowCount == 0) {
+                Throwables.propagateIfPossible(new TDengineConnectorException(TDengineConnectorErrorCode.SQL_OPERATION_FAILED, "insert error:" + element));
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (Objects.nonNull(conn)) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "TDengine writer connection close failed", e);

Review Comment:
   Use `CommonErrorCode.WRITER_OPERATION_FAILED`



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        synchronized (collector.getCheckpointLock()) {
+            sourceSplits.forEach(split -> {
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(TDengineConnectorErrorCode.READER_FAILED, "TDengine split read error", e);
+                }
+            });
+        }
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded TDengine source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public void open(){
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        //todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
+        // there is a exception : Caused by: java.sql.SQLException: can't create connection with server
+        // under docker network env
+        // @bobo (tdengine)
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false");
+        try {
+            conn = DriverManager.getConnection(jdbcUrl, connProps);
+        } catch (SQLException e) {
+            throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "get TDengine connection failed:" + jdbcUrl);

Review Comment:
   Use `CommonErrorCode.READER_OPERATION_FAILED`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r984404214


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+
+import com.google.common.collect.Sets;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
+    private final TDengineSourceConfig config;
+    private Set<TDengineSourceSplit> pendingSplit = Sets.newConcurrentHashSet();
+    private Set<TDengineSourceSplit> assignedSplit = Sets.newConcurrentHashSet();
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this.config = config;
+        this.context = context;
+    }
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this(config, context);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return tp.hashCode() % numReaders;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void run() {
+        pendingSplit = getAllSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    /**
+     * split the time range into numPartitions parts if numPartitions is 1, use the whole time range if numPartitions < (end - start), use (start-end) partitions
+     * <p>
+     * eg: start = 1, end = 10, numPartitions = 2 sql = "select * from test"
+     * <p>
+     * split result
+     * <p>
+     * split 1: select * from test  where (time >= 1 and time < 6)
+     * <p>
+     * split 2: select * from test  where (time >= 6 and time < 11)
+     */
+    private Set<TDengineSourceSplit> getAllSplit() {
+        String sql = "select * from " + config.getStable();
+        Set<TDengineSourceSplit> sourceSplits = Sets.newHashSet();
+        // no need numPartitions, use one partition
+        if (config.getPartitionsNum() <= 1) {
+            sourceSplits.add(new TDengineSourceSplit("0", sql));
+            return sourceSplits;
+        }
+        long start = config.getLowerBound();
+        long end = config.getUpperBound();
+        int partitionsNum = config.getPartitionsNum();
+        if (end - start < partitionsNum) {
+            partitionsNum = (int) (end - start);
+        }
+        int size = (int) (end - start) / partitionsNum;
+        long currentStart = start;
+        int i = 0;
+        while (i < partitionsNum) {
+            //Left closed right away
+            long currentEnd = i == partitionsNum - 1 ? end + 1 : currentStart + size;
+            String query = " where ts >= '" + Instant.ofEpochMilli(currentStart).atZone(ZoneId.systemDefault()).format(FORMATTER) + "' and ts < '" + Instant.ofEpochMilli(currentEnd).atZone(ZoneId.systemDefault()).format(FORMATTER) + "'";
+            String finalSQL = sql + query;
+            sourceSplits.add(new TDengineSourceSplit(String.valueOf(i + System.nanoTime()), finalSQL));
+            i++;
+            currentStart += size;
+        }
+        return sourceSplits;
+    }
+
+    @Override
+    public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> taskIDList) {

Review Comment:
   context.currentParallelism()   is  reader num?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1005305614


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {
+            String sql = "INSERT INTO " + element.getField(0)

Review Comment:
   After JDK8 version, ` + ` operator of String will be optimized to `StringBuilder` or `StringBuffer` automatically. so we don't need to use them manually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1291651439

   > > can you add e2e testcase?
   > > new version e2e: https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e
   > > OR
   > > old version e2e: https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e
   > 
   > do you need help?
   
   Thx for reminding me . I will do this later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul closed pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul closed pull request #2832: [Feature][Connector-V2] add tdengine source
URL: https://github.com/apache/incubator-seatunnel/pull/2832


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1298113317

   Retry CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r982419074


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+
+import com.google.common.collect.Sets;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
+    private final TDengineSourceConfig config;
+    private Set<TDengineSourceSplit> pendingSplit = Sets.newConcurrentHashSet();
+    private Set<TDengineSourceSplit> assignedSplit = Sets.newConcurrentHashSet();
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this.config = config;
+        this.context = context;
+    }
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this(config, context);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return tp.hashCode() % numReaders;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void run() {
+        pendingSplit = getAllSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    /**
+     * split the time range into numPartitions parts if numPartitions is 1, use the whole time range if numPartitions < (end - start), use (start-end) partitions
+     * <p>
+     * eg: start = 1, end = 10, numPartitions = 2 sql = "select * from test"
+     * <p>
+     * split result
+     * <p>
+     * split 1: select * from test  where (time >= 1 and time < 6)
+     * <p>
+     * split 2: select * from test  where (time >= 6 and time < 11)
+     */
+    private Set<TDengineSourceSplit> getAllSplit() {
+        String sql = "select * from " + config.getStable();
+        Set<TDengineSourceSplit> sourceSplits = Sets.newHashSet();
+        // no need numPartitions, use one partition
+        if (config.getPartitionsNum() <= 1) {
+            sourceSplits.add(new TDengineSourceSplit("0", sql));
+            return sourceSplits;
+        }
+        long start = config.getLowerBound();
+        long end = config.getUpperBound();
+        int partitionsNum = config.getPartitionsNum();
+        if (end - start < partitionsNum) {
+            partitionsNum = (int) (end - start);
+        }
+        int size = (int) (end - start) / partitionsNum;
+        long currentStart = start;
+        int i = 0;
+        while (i < partitionsNum) {
+            //Left closed right away
+            long currentEnd = i == partitionsNum - 1 ? end + 1 : currentStart + size;
+            String query = " where ts >= '" + Instant.ofEpochMilli(currentStart).atZone(ZoneId.systemDefault()).format(FORMATTER) + "' and ts < '" + Instant.ofEpochMilli(currentEnd).atZone(ZoneId.systemDefault()).format(FORMATTER) + "'";
+            String finalSQL = sql + query;
+            sourceSplits.add(new TDengineSourceSplit(String.valueOf(i + System.nanoTime()), finalSQL));
+            i++;
+            currentStart += size;
+        }
+        return sourceSplits;
+    }
+
+    @Override
+    public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    private void assignSplit(Collection<Integer> taskIDList) {

Review Comment:
   reference to the new version code (fix some bugs)
   
   https://github.com/apache/incubator-seatunnel/pull/2917



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {

Review Comment:
   reference to the new version  code (fix some bugs)
   
   https://github.com/apache/incubator-seatunnel/pull/2917



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+
+import com.google.auto.service.AutoService;
+import com.google.common.base.Throwables;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+@AutoService(SeaTunnelSink.class)
+public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private static final String RULES = "rules";
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
+        return new TDengineSinkWriter(seaTunnelRowType);
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) {
+        if (!pluginConfig.hasPath(RULES)) {
+            Throwables.propagateIfPossible(new ConfigException.Missing(RULES));
+        }
+
+        List<? extends Config> configList = pluginConfig.getConfigList(RULES);
+        if (CollectionUtils.isEmpty(configList)) {
+            Throwables.propagateIfPossible(new ConfigException.BadValue(RULES, "Assert rule config is empty, please add rule config."));
+        }
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Assert";

Review Comment:
   rename `Assert` to `TDengine`?



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
+
+import com.google.common.collect.Sets;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
+    private final TDengineSourceConfig config;
+    private Set<TDengineSourceSplit> pendingSplit = Sets.newConcurrentHashSet();
+    private Set<TDengineSourceSplit> assignedSplit = Sets.newConcurrentHashSet();
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this.config = config;
+        this.context = context;
+    }
+
+    public TDengineSourceSplitEnumerator(TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
+        this(config, context);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return tp.hashCode() % numReaders;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void run() {
+        pendingSplit = getAllSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    /**
+     * split the time range into numPartitions parts if numPartitions is 1, use the whole time range if numPartitions < (end - start), use (start-end) partitions
+     * <p>
+     * eg: start = 1, end = 10, numPartitions = 2 sql = "select * from test"
+     * <p>
+     * split result
+     * <p>
+     * split 1: select * from test  where (time >= 1 and time < 6)
+     * <p>
+     * split 2: select * from test  where (time >= 6 and time < 11)
+     */
+    private Set<TDengineSourceSplit> getAllSplit() {

Review Comment:
   reference to the new version code (fix some bugs)
   
   https://github.com/apache/incubator-seatunnel/pull/2917



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import java.io.IOException;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    public TDengineSinkWriter(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {

Review Comment:
   Is it not implemented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1003938749


##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,107 @@
+# TDengine

Review Comment:
   Users mostly look for usage documents via sink list instead of source
   
   https://seatunnel.apache.org/docs/category/sink-v2



##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -57,6 +56,26 @@
             <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>

Review Comment:
   Do not submit example module file



##########
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/tdengine_to_tdengine.conf:
##########
@@ -0,0 +1,86 @@
+#

Review Comment:
   Do not submit example module files



##########
seatunnel-connectors-v2-dist/pom.xml:
##########
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   why close?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#issuecomment-1289898788

   > can you add e2e testcase?
   > 
   > new version e2e: https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e
   > 
   > OR
   > 
   > old version e2e: https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e
   
   do you need help?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063938913


##########
docs/en/connector-v2/sink/TDengine.md:
##########
@@ -0,0 +1,69 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine. You need to create stable before running seatunnel task
+
+## Key features
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)

Review Comment:
   Refer to docs/en/concept/connector-v2-features.md, sink only has two features exactly-once and cdc



##########
docs/en/connector-v2/source/TDengine.md:
##########
@@ -0,0 +1,82 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)

Review Comment:
   Refer to docs/en/concept/connector-v2-features.md, schema projection is currently called column projection



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        sourceSplits.forEach(split -> {

Review Comment:
   During the read split and changing state(sourceSplits), you need to obtain CheckpointLock, otherwise it cannot be guaranteed to be exactly-once.



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import com.google.common.collect.Sets;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+@Slf4j
+public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final TDengineSourceConfig config;
+
+    private final Set<TDengineSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private Connection conn;
+
+    public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
+        this.config = config;
+        this.sourceSplits = Sets.newHashSet();
+        this.context = readerContext;
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        sourceSplits.forEach(split -> {

Review Comment:
   like
   ```
   synchronized (output.getCheckpointLock()) {
     sourceSplits.forEach(split -> {
          read(split, collector);
     }
   }
   ```
   Or refer to JdbcSourceReader to use a smaller lock granularity



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TDengineSourceConfig implements Serializable {
+
+    /**
+     * jdbc:TAOS-RS://localhost:6041/
+     */
+    private String url;
+    private String username;
+    private String password;
+    private String database;
+    private String stable;
+    //param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side, other than the JDBC client side
+    //so this param represent the server-side timezone setting up
+    private String timezone;
+    private String lowerBound;
+    private String upperBound;
+    private List<String> fields;
+    private List<String> tags;
+
+    public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
+        TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
+        tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null);

Review Comment:
   It is better to add config check。



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.taosdata.jdbc.TSDBDriver;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Properties;
+
+public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Connection conn;
+    private final TDengineSourceConfig config;
+    private int tagsNum;
+
+    @SneakyThrows
+    public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tagsNum = pluginConfig.hasPath("tags") ? pluginConfig.getObject("tags").size() : 0;
+
+        config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+        String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+        Properties connProps = new Properties();
+        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
+        conn = DriverManager.getConnection(jdbcUrl, connProps);
+    }
+
+    @SneakyThrows
+    @Override
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public void write(SeaTunnelRow element) {
+        final ArrayList<Object> tags = Lists.newArrayList();
+        for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
+            tags.add(element.getField(i));
+        }
+        final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ",");
+
+        final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum);
+
+        try (Statement statement = conn.createStatement()) {
+            String sql = "INSERT INTO " + element.getField(0)
+                + " using " + config.getStable()
+                + " tags ( "
+                + tagValues
+                + " ) VALUES ("
+                + StringUtils.join(convertDataType(metrics), ",")
+                + ");";
+            final int rowCount = statement.executeUpdate(sql);

Review Comment:
   Whether to support `write(List<SeaTunnelRow>)` needs to be discussed. Currently ST only supports write (SeaTunnelRow), if you need better performance, you need to implement batch flush yourself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lhyundeadsoul commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
lhyundeadsoul commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1063943180


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TDengineSourceConfig implements Serializable {
+
+    /**
+     * jdbc:TAOS-RS://localhost:6041/
+     */
+    private String url;
+    private String username;
+    private String password;
+    private String database;
+    private String stable;
+    //param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side, other than the JDBC client side
+    //so this param represent the server-side timezone setting up
+    private String timezone;
+    private String lowerBound;
+    private String upperBound;
+    private List<String> fields;
+    private List<String> tags;
+
+    public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
+        TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
+        tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null);

Review Comment:
   It has been checked in `org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSource#prepare`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2832: [Feature][Connector-V2] add tdengine source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2832:
URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1062090069


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink.conf:
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 2
+  job.mode = "STREAMING"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+#   FakeSource {
+#     parallelism = 2
+#     result_table_name = "fake"
+#     row.num = 16
+#     schema = {
+#       fields {
+#         name = "string"
+#         age = "int"
+#       }
+#     }
+#   }
+    TDengine {
+      url : "jdbc:TAOS-RS://flink_e2e_tdengine_src:6041/"
+      username : "root"
+      password : "taosdata"
+      database : "power"
+      stable : "meters"
+      lower_bound : "2018-10-03 14:38:05.000"
+      upper_bound : "2018-10-03 14:38:16.801"
+      fields {
+        ts = "timestamp"
+        current = "float"
+        voltage = "int"
+        phase = "float"
+      }
+      tags {
+        location = "string"
+        groupid = "int"
+      }
+      result_table_name = "tdengine_result"
+    }
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+  sql {
+    sql = "select * from tdengine_result"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform
+}

Review Comment:
   sql transform  has been removed
   
   ```suggestion
   transform {
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org