You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/09 03:26:57 UTC

[GitHub] [incubator-seatunnel] 531651225 opened a new pull request, #2697: [Feature][Connector-V2] Add influxDB connector source

531651225 opened a new pull request, #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   https://github.com/apache/incubator-seatunnel/issues/1946
   Add InfluxDB Connector Source
   close https://github.com/apache/incubator-seatunnel/pull/2595
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [x] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1250748683

   > > > > > > > > > can you add e2e-testcase(spark)?
   > > > > > > > > 
   > > > > > > > > 
   > > > > > > > > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   > > > > > > > 
   > > > > > > > 
   > > > > > > > @CalvinKirs PTAL
   > > > > > > 
   > > > > > > 
   > > > > > > We'd better be able to rename these jars @TyrantLucifer WDYT?
   > > > > > 
   > > > > > 
   > > > > > Yes, aggre with @CalvinKirs , you can add `maven-shade` plugin in your pom to rename these jars. BTW, could you please use `httpclient`, not use `okhttp`, it's better to unify versions and dependencies for http dependencies. In subsequent iterations, we need to abstract this http-related functionality into public modules and replace them uniformly with the renamed httpclient. What do you think about? @531651225 @CalvinKirs
   > > > > 
   > > > > 
   > > > > <img alt="图片" width="420" src="https://user-images.githubusercontent.com/33744252/190942867-13f2ca43-59d8-425b-aa10-e2c29817821d.png">
   > > > > It seems impossible to replace okhttp with httpclient. The okhttp jar is used internally by the influxdb-java.jar. Influxdb-java is the only way officially provided for accessing influxdb.
   > > > 
   > > > 
   > > > This is really bad news. But don't worry, we also have the ultimate weapon, the `maven-shade` plugin. We can use this plugin to rename packages of okhttp to avoid jar conflict.
   > > 
   > > 
   > > Thanks for your advice. I'll try. By the way, is there any other case where the shade plugin is used by other connectors
   > 
   > You can refer to https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html or see `seatunnel-config` module get the usage of shade plugin.
   
   thks, The conflict has been resolved. 
   have already added spark E2E, Please help  review   @hailin0 @CalvinKirs @EricJoy2048 @TyrantLucifer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r974136689


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf:
##########
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "f5"
+        fields {
+            label = STRING
+            f1 = STRING
+            f2 = DOUBLE
+            f3 = BIGINT
+            f4 = FLOAT
+            f5 = INT
+            f6 = SMALLINT
+            f7 = BOOLEAN

Review Comment:
   Thank you for your suggestion, but at present, the only data types supported by influxdb are strings, floats, integers, or booleans. 
   
   detail in https://docs.influxdata.com/influxdb/v1.8/concepts/glossary/#field-value
   <img width="870" alt="图片" src="https://user-images.githubusercontent.com/33744252/191006391-71aef1ec-8081-4490-93c2-f33a7668b422.png">



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1246156593

   can you add e2e-testcase(spark)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r980704699


##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -0,0 +1,170 @@
+# InfluxDB
+
+> InfluxDB source connector
+
+## Description
+
+Read external data source data through InfluxDB.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name        | type    | required | default value |
+|-------------|---------|----------|---------------|
+| url         | string  | yes      | -             |
+| sql         | string  | yes      | -             |
+| fields                     | config  | yes      | -             |
+| database            | string  | yes      |               |
+| username    | string  | no       | -             |
+| password    | string  | no       | -             |
+| lower_bound | long    | no       | -             |
+| upper_bound | long    | no       | -             |
+| partition_num | int     | no       | -             |
+| split_column | string     | no       | -             |
+| epoch | string     | no       | n             |
+| connect_timeout_ms | long     | no       | 15000         |
+| query_timeout_sec | int     | no       | 3             |

Review Comment:
   check style
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r974137812


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf:
##########
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "f5"
+        fields {
+            label = STRING
+            f1 = STRING
+            f2 = DOUBLE
+            f3 = BIGINT
+            f4 = FLOAT
+            f5 = INT
+            f6 = SMALLINT
+            f7 = BOOLEAN

Review Comment:
   
   
   
   
   > test all datatype?
   > 
   > reference https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/resources/iceberg/iceberg_source.conf#L30
   
   > Thank you for your suggestion, but at present, the only data types supported by influxdb are strings, floats, integers, or booleans.
   > 
   > detail in https://docs.influxdata.com/influxdb/v1.8/concepts/glossary/#field-value <img alt="图片" width="870" src="https://user-images.githubusercontent.com/33744252/191006391-71aef1ec-8081-4490-93c2-f33a7668b422.png">



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1251807976

   @531651225  does this pr can review again? 
   If it's ok,@hailin0 @TyrantLucifer  @Hisoka-X  please reviews this pr again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1250595889

   > > > > > > > > can you add e2e-testcase(spark)?
   > > > > > > > 
   > > > > > > > 
   > > > > > > > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   > > > > > > 
   > > > > > > 
   > > > > > > @CalvinKirs PTAL
   > > > > > 
   > > > > > 
   > > > > > We'd better be able to rename these jars @TyrantLucifer WDYT?
   > > > > 
   > > > > 
   > > > > Yes, aggre with @CalvinKirs , you can add `maven-shade` plugin in your pom to rename these jars. BTW, could you please use `httpclient`, not use `okhttp`, it's better to unify versions and dependencies for http dependencies. In subsequent iterations, we need to abstract this http-related functionality into public modules and replace them uniformly with the renamed httpclient. What do you think about? @531651225 @CalvinKirs
   > > > 
   > > > 
   > > > <img alt="图片" width="420" src="https://user-images.githubusercontent.com/33744252/190942867-13f2ca43-59d8-425b-aa10-e2c29817821d.png">
   > > > It seems impossible to replace okhttp with httpclient. The okhttp jar is used internally by the influxdb-java.jar. Influxdb-java is the only way officially provided for accessing influxdb.
   > > 
   > > 
   > > This is really bad news. But don't worry, we also have the ultimate weapon, the `maven-shade` plugin. We can use this plugin to rename packages of okhttp to avoid jar conflict.
   > 
   > Thanks for your advice. I'll try. By the way, is there any other case where the shade plugin is used by other connectors
   
   You can refer to https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html or see `seatunnel-config` module get the usage of shade plugin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1246305483

   > can you add e2e-testcase(spark)?
   
   there is a conflict of the okhttp jar  between the Spark jar  and  the influxdb connector.
   At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r974031799


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf:
##########
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "f5"
+        fields {
+            label = STRING

Review Comment:
   as above



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf:
##########
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "f5"
+        fields {
+            label = STRING
+            f1 = STRING
+            f2 = DOUBLE
+            f3 = BIGINT
+            f4 = FLOAT
+            f5 = INT
+            f6 = SMALLINT
+            f7 = BOOLEAN

Review Comment:
   test all datatype?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/resources/iceberg/iceberg_source.conf#L30



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r987566606


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+    private final Context<InfluxDBSourceSplit> context;
+    private final Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
+    private final Object stateLock = new Object();
+    private volatile boolean shouldEnumerate;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this(context, null, config);
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+        this.pendingSplit = new HashMap<>();
+        this.shouldEnumerate = sourceState == null;
+        if (sourceState != null) {
+            this.shouldEnumerate = sourceState.isShouldEnumerate();
+            this.pendingSplit.putAll(sourceState.getPendingSplit());
+        }
+    }
+
+    @Override
+    public void run() {
+        Set<Integer> readers = context.registeredReaders();
+        if (shouldEnumerate) {
+            Set<InfluxDBSourceSplit> newSplits = getInfluxDBSplit();
+
+            synchronized (stateLock) {

Review Comment:
   @ashulin @hailin0   PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r983026387


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+
+    private final Context<InfluxDBSourceSplit> context;
+    private Set<InfluxDBSourceSplit> pendingSplit;
+    private Set<InfluxDBSourceSplit> assignedSplit;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this(context, config);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();

Review Comment:
   
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java#L83



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1250482987

   > > > > can you add e2e-testcase(spark)?
   > > > 
   > > > 
   > > > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   > > 
   > > 
   > > @CalvinKirs PTAL
   > 
   > We'd better be able to rename these jars @TyrantLucifer WDYT?
   
   Yes, aggre with you, you can add `maven-shade` plugin in your pom to rename these jars. BTW, could you please use `httpclient`, not use `okhttp`, it's better to unify versions and dependencies for http dependencies. In subsequent iterations, we need to abstract this http-related functionality into public modules and replace them uniformly with the renamed httpclient. What do you think about? @531651225 @CalvinKirs 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r966723797


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf:
##########
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select * from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "value"
+        fields {
+            label = STRING
+            value = INT
+            rt = STRING
+            time = BIGINT

Review Comment:
   already fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1250517654

   > > > > > > > can you add e2e-testcase(spark)?
   > > > > > > 
   > > > > > > 
   > > > > > > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   > > > > > 
   > > > > > 
   > > > > > @CalvinKirs PTAL
   > > > > 
   > > > > 
   > > > > We'd better be able to rename these jars @TyrantLucifer WDYT?
   > > > 
   > > > 
   > > > Yes, aggre with @CalvinKirs , you can add `maven-shade` plugin in your pom to rename these jars. BTW, could you please use `httpclient`, not use `okhttp`, it's better to unify versions and dependencies for http dependencies. In subsequent iterations, we need to abstract this http-related functionality into public modules and replace them uniformly with the renamed httpclient. What do you think about? @531651225 @CalvinKirs
   > > 
   > > 
   > > <img alt="图片" width="420" src="https://user-images.githubusercontent.com/33744252/190942867-13f2ca43-59d8-425b-aa10-e2c29817821d.png">
   > > It seems impossible to replace okhttp with httpclient. The okhttp jar is used internally by the influxdb-java.jar. Influxdb-java is the only way officially provided for accessing influxdb.
   > 
   > This is really bad news. But don't worry, we also have the ultimate weapon, the `maven-shade` plugin. We can use this plugin to rename packages of okhttp to avoid jar conflict.
   
   Thanks for your advice. I'll try. By the way, is there any other case where the main shade plugin is used by other connectors
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1250477880

   > > > can you add e2e-testcase(spark)?
   > > 
   > > 
   > > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   > 
   > @CalvinKirs PTAL
   
   We'd better be able to rename these jars @TyrantLucifer  WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r981002536


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.converter;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InfluxDBRowConverter {
+
+    public static SeaTunnelRow convert(List<Object> values, SeaTunnelRowType typeInfo, List<Integer> indexList) {
+
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+
+        for (int i = 0; i <= seaTunnelDataTypes.length - 1; i++) {
+            Object seatunnelField;
+            int columnIndex = indexList.get(i);
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+            if (null == values.get(columnIndex)) {
+                seatunnelField = null;
+            }
+            else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {

Review Comment:
   use `seaTunnelDataType.getSqlType()` better.



##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+
+    private final Context<InfluxDBSourceSplit> context;
+    private Set<InfluxDBSourceSplit> pendingSplit;
+    private Set<InfluxDBSourceSplit> assignedSplit;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this(context, config);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();

Review Comment:
   `open()` will be called again after `public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config)`.
   
   Therefore, we cannot reset split in open method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r984521608


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+
+    private final Context<InfluxDBSourceSplit> context;
+    private Set<InfluxDBSourceSplit> pendingSplit;
+    private Set<InfluxDBSourceSplit> assignedSplit;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this(context, config);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();

Review Comment:
   > `open()` will be called again after `public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config)`.
   > 
   > Therefore, we cannot reset split in open method.
   
   already fixed @hailin0 @EricJoy2048 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r990626363


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InfluxDBRowConverter {
+
+    public static SeaTunnelRow convert(List<Object> values, SeaTunnelRowType typeInfo, List<Integer> indexList) {
+
+        List<Object> fields = new ArrayList<>();

Review Comment:
   > Gave a init size better.
   
   have already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1250498242

   > > > > > can you add e2e-testcase(spark)?
   > > > > 
   > > > > 
   > > > > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   > > > 
   > > > 
   > > > @CalvinKirs PTAL
   > > 
   > > 
   > > We'd better be able to rename these jars @TyrantLucifer WDYT?
   > 
   > Yes, aggre with @CalvinKirs , you can add `maven-shade` plugin in your pom to rename these jars. BTW, could you please use `httpclient`, not use `okhttp`, it's better to unify versions and dependencies for http dependencies. In subsequent iterations, we need to abstract this http-related functionality into public modules and replace them uniformly with the renamed httpclient. What do you think about? @531651225 @CalvinKirs
   
   <img width="420" alt="图片" src="https://user-images.githubusercontent.com/33744252/190942867-13f2ca43-59d8-425b-aa10-e2c29817821d.png">
   
   It seems impossible to replace okhttp with httpclient. The okhttp jar is used internally by the influxdb-java.jar. 
   Influxdb-java is the only way officially provided for accessing influxdb.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1250512672

   > > > > > > can you add e2e-testcase(spark)?
   > > > > > 
   > > > > > 
   > > > > > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   > > > > 
   > > > > 
   > > > > @CalvinKirs PTAL
   > > > 
   > > > 
   > > > We'd better be able to rename these jars @TyrantLucifer WDYT?
   > > 
   > > 
   > > Yes, aggre with @CalvinKirs , you can add `maven-shade` plugin in your pom to rename these jars. BTW, could you please use `httpclient`, not use `okhttp`, it's better to unify versions and dependencies for http dependencies. In subsequent iterations, we need to abstract this http-related functionality into public modules and replace them uniformly with the renamed httpclient. What do you think about? @531651225 @CalvinKirs
   > 
   > <img alt="图片" width="420" src="https://user-images.githubusercontent.com/33744252/190942867-13f2ca43-59d8-425b-aa10-e2c29817821d.png">
   > 
   > It seems impossible to replace okhttp with httpclient. The okhttp jar is used internally by the influxdb-java.jar. Influxdb-java is the only way officially provided for accessing influxdb.
   
   This is really bad news. But don't worry, we also have the ultimate weapon, the `maven-shade` plugin. We can use this plugin to rename packages of okhttp to avoid jar conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1249289723

   > > can you add e2e-testcase(spark)?
   > 
   > there is a conflict of the okhttp jar between the Spark jar and the influxdb connector. At present, this problem is solved by specifying the replacement of conflicting packages in the spark jars directory in the document
   
   @CalvinKirs  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#issuecomment-1251847756

   > @531651225 does this pr can review again? If it's ok,@hailin0 @TyrantLucifer @Hisoka-X please reviews this pr again.
   
   
   
   > @531651225 does this pr can review again? If it's ok,@hailin0 @TyrantLucifer @Hisoka-X please reviews this pr again.
   
    Okay, help me review, thks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r984612412


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBRowConverter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class InfluxdbSourceReader implements SourceReader<SeaTunnelRow, InfluxDBSourceSplit> {
+    private InfluxDB influxDB;
+    InfluxDBConfig config;
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private Set<InfluxDBSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    List<Integer> columnsIndexList;
+
+    InfluxdbSourceReader(InfluxDBConfig config, Context readerContext, SeaTunnelRowType seaTunnelRowType, List<Integer> columnsIndexList) {
+        this.config = config;
+        this.sourceSplits = new HashSet<>();
+        this.context = readerContext;
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.columnsIndexList = columnsIndexList;
+    }
+
+    public void connect() throws ConnectException {
+        if (influxDB == null) {
+            influxDB = InfluxDBClient.getInfluxDB(config);
+            String version = influxDB.version();
+            if (!influxDB.ping().isGood()) {
+                String errorMessage =
+                        String.format(
+                                "connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}",
+                                config.getUrl());
+                throw new ConnectException(errorMessage);
+            }
+            log.info("connect influxdb successful. sever version :{}.", version);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        connect();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (influxDB != null) {
+            influxDB.close();
+            influxDB = null;
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        sourceSplits.forEach(source -> {
+            try {
+                read(source, output);
+            } catch (Exception e) {
+                throw new RuntimeException("influxDB source read error", e);
+            }
+        });
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded fake source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public List<InfluxDBSourceSplit> snapshotState(long checkpointId) throws Exception {
+        return null;

Review Comment:
   Please add split snapshot.
   you can see https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r985010223


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBRowConverter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class InfluxdbSourceReader implements SourceReader<SeaTunnelRow, InfluxDBSourceSplit> {
+    private InfluxDB influxDB;
+    InfluxDBConfig config;
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private Set<InfluxDBSourceSplit> sourceSplits;
+
+    private final Context context;
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    List<Integer> columnsIndexList;
+
+    InfluxdbSourceReader(InfluxDBConfig config, Context readerContext, SeaTunnelRowType seaTunnelRowType, List<Integer> columnsIndexList) {
+        this.config = config;
+        this.sourceSplits = new HashSet<>();
+        this.context = readerContext;
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.columnsIndexList = columnsIndexList;
+    }
+
+    public void connect() throws ConnectException {
+        if (influxDB == null) {
+            influxDB = InfluxDBClient.getInfluxDB(config);
+            String version = influxDB.version();
+            if (!influxDB.ping().isGood()) {
+                String errorMessage =
+                        String.format(
+                                "connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}",
+                                config.getUrl());
+                throw new ConnectException(errorMessage);
+            }
+            log.info("connect influxdb successful. sever version :{}.", version);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        connect();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (influxDB != null) {
+            influxDB.close();
+            influxDB = null;
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        if (sourceSplits.isEmpty()) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        sourceSplits.forEach(source -> {
+            try {
+                read(source, output);
+            } catch (Exception e) {
+                throw new RuntimeException("influxDB source read error", e);
+            }
+        });
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded fake source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public List<InfluxDBSourceSplit> snapshotState(long checkpointId) throws Exception {
+        return null;

Review Comment:
   > Please add split snapshot. you can see https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
   
   already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r966591859


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf:
##########
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select * from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "value"
+        fields {
+            label = STRING
+            value = INT
+            rt = STRING
+            time = BIGINT

Review Comment:
   test all datatypes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r980928040


##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -0,0 +1,170 @@
+# InfluxDB
+
+> InfluxDB source connector
+
+## Description
+
+Read external data source data through InfluxDB.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name        | type    | required | default value |
+|-------------|---------|----------|---------------|
+| url         | string  | yes      | -             |
+| sql         | string  | yes      | -             |
+| fields                     | config  | yes      | -             |
+| database            | string  | yes      |               |
+| username    | string  | no       | -             |
+| password    | string  | no       | -             |
+| lower_bound | long    | no       | -             |
+| upper_bound | long    | no       | -             |
+| partition_num | int     | no       | -             |
+| split_column | string     | no       | -             |
+| epoch | string     | no       | n             |
+| connect_timeout_ms | long     | no       | 15000         |
+| query_timeout_sec | int     | no       | 3             |

Review Comment:
   > check style
   
   already fixed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697#discussion_r984521090


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.converter;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InfluxDBRowConverter {
+
+    public static SeaTunnelRow convert(List<Object> values, SeaTunnelRowType typeInfo, List<Integer> indexList) {
+
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+
+        for (int i = 0; i <= seaTunnelDataTypes.length - 1; i++) {
+            Object seatunnelField;
+            int columnIndex = indexList.get(i);
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+            if (null == values.get(columnIndex)) {
+                seatunnelField = null;
+            }
+            else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {

Review Comment:
   already fixed



##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+
+    private final Context<InfluxDBSourceSplit> context;
+    private Set<InfluxDBSourceSplit> pendingSplit;
+    private Set<InfluxDBSourceSplit> assignedSplit;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this(context, config);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();

Review Comment:
   already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #2697: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #2697:
URL: https://github.com/apache/incubator-seatunnel/pull/2697


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org