You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/01 15:05:17 UTC

[GitHub] [incubator-seatunnel] 531651225 opened a new pull request, #2595: [Feature][Connector-V2] Add influxDB connector source

531651225 opened a new pull request, #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   https://github.com/apache/incubator-seatunnel/issues/1946
   Add InfluxDB Connector Source
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [x] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961229663


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+
+    private final Context<InfluxDBSourceSplit> context;
+    private Set<InfluxDBSourceSplit> pendingSplit;
+    private Set<InfluxDBSourceSplit> assignedSplit;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this(context, config);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
+    }
+
+    @Override
+    public void run() throws Exception {
+        pendingSplit = getInfluxDBSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    @Override
+    public void close() throws IOException {
+        //nothing to do
+    }
+
+    @Override
+    public void addSplitsBack(List splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        //nothing to do
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public InfluxDBSourceState snapshotState(long checkpointId) throws Exception {
+        return new InfluxDBSourceState(assignedSplit);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        //nothing to do
+
+    }
+
+    private Set<InfluxDBSourceSplit> getInfluxDBSplit() {
+        String sql = config.getSql();
+        Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
+        // no need numPartitions, use one partition
+        if (config.getPartitionNum() == 0) {
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql));
+            return influxDBSourceSplits;
+        }
+        long start = config.getLowerBound();
+        long end = config.getUpperBound();
+        int numPartitions = config.getPartitionNum();
+        String[] sqls = sql.split(SQL_WHERE);
+        if (sqls.length > 2) {
+            throw new IllegalArgumentException("sql should not contain more than one where");
+        }
+        int size = (int) (end - start) / numPartitions + 1;
+        int remainder = (int) ((end + 1 - start) % numPartitions);
+        if (end - start < numPartitions) {
+            numPartitions = (int) (end - start);
+        }
+        long currentStart = start;
+        int i = 0;
+        while (i < numPartitions) {
+            String query = " where (" + config.getSplitKey() + " >= " + currentStart + " and " + config.getSplitKey()  + " < " + (currentStart + size) + ") ";
+            i++;
+            currentStart += size;
+            if (i + 1 <= numPartitions) {
+                currentStart = currentStart - remainder;
+            }
+            query = sqls[0] + query;
+            if (sqls.length > 1) {
+                query = query + " and ( " + sqls[1] + " ) ";
+            }
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(String.valueOf(i + System.nanoTime()), query));
+        }
+        return influxDBSourceSplits;
+    }

Review Comment:
   extract this piece of logic as a public method? WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961266752


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToConsoleIT.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.influxdb;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class InfluxDBSourceToConsoleIT extends FlinkContainer {

Review Comment:
   Can you add connector-v2 e2e-testcase(spark)?



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/influxdb/influxdb_source_to_console.conf:
##########
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select * from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "value"
+    }
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  Console {}

Review Comment:
   Use `Assert` sink check datatype?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961804298


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToConsoleIT.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.influxdb;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class InfluxDBSourceToConsoleIT extends FlinkContainer {

Review Comment:
    Spark E2E has a jar package conflict. Wechat groups in the community suggest paying attention to https://github.com/apache/incubator-seatunnel/issues/2607 . I wonder if I can add it after the community solves it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961232826


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+
+    private final Context<InfluxDBSourceSplit> context;
+    private Set<InfluxDBSourceSplit> pendingSplit;
+    private Set<InfluxDBSourceSplit> assignedSplit;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this(context, config);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
+    }
+
+    @Override
+    public void run() throws Exception {
+        pendingSplit = getInfluxDBSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    @Override
+    public void close() throws IOException {
+        //nothing to do
+    }
+
+    @Override
+    public void addSplitsBack(List splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        //nothing to do
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public InfluxDBSourceState snapshotState(long checkpointId) throws Exception {
+        return new InfluxDBSourceState(assignedSplit);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        //nothing to do
+
+    }
+
+    private Set<InfluxDBSourceSplit> getInfluxDBSplit() {
+        String sql = config.getSql();
+        Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
+        // no need numPartitions, use one partition
+        if (config.getPartitionNum() == 0) {
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql));
+            return influxDBSourceSplits;
+        }
+        long start = config.getLowerBound();
+        long end = config.getUpperBound();
+        int numPartitions = config.getPartitionNum();
+        String[] sqls = sql.split(SQL_WHERE);
+        if (sqls.length > 2) {
+            throw new IllegalArgumentException("sql should not contain more than one where");
+        }
+        int size = (int) (end - start) / numPartitions + 1;
+        int remainder = (int) ((end + 1 - start) % numPartitions);
+        if (end - start < numPartitions) {
+            numPartitions = (int) (end - start);
+        }
+        long currentStart = start;
+        int i = 0;
+        while (i < numPartitions) {
+            String query = " where (" + config.getSplitKey() + " >= " + currentStart + " and " + config.getSplitKey()  + " < " + (currentStart + size) + ") ";
+            i++;
+            currentStart += size;
+            if (i + 1 <= numPartitions) {
+                currentStart = currentStart - remainder;
+            }
+            query = sqls[0] + query;
+            if (sqls.length > 1) {
+                query = query + " and ( " + sqls[1] + " ) ";
+            }
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(String.valueOf(i + System.nanoTime()), query));
+        }
+        return influxDBSourceSplits;
+    }

Review Comment:
   > extract this piece of logic as a public method? WDYT?
   
   It's a good suggestion. to abstract a public method to seatunnel-common module?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r962443336


##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -0,0 +1,114 @@
+# InfluxDB

Review Comment:
   Hi, In https://github.com/apache/incubator-seatunnel/pull/2625 we  have redefined the format of the document. Please modify the document according to the current latest document format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961797002


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_FIELD_SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_TAG_SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBTypeMapper.INFLUXDB_BIGINT;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit, InfluxDBSourceState>  {
+    private SeaTunnelRowType seaTunnelRowType;
+    private InfluxDBConfig influxDBConfig;
+
+    private static final String INFLUXDB_TIME_KEY = "time";
+    private static final String INFLUXDB_TIME_DATATYPE = INFLUXDB_BIGINT;
+
+    @Override
+    public String getPluginName() {
+        return "InfluxDB";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+        }
+        try {
+            this.influxDBConfig = new InfluxDBConfig(config);
+            InfluxDB influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+            seaTunnelRowType = initTableField(influxDB, influxDBConfig);

Review Comment:
   > hi, we need user config this. //
   > 
   > ```
   >     SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(pluginConfig);
   >     this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
   > ```
   > 
   > #2469
   
   hi,I had changed in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#issuecomment-1236962310

   <img width="1524" alt="image" src="https://user-images.githubusercontent.com/14371345/188451711-792053b1-0b5b-4db2-a65b-6b628d55af10.png">
   https://pipelines.actions.githubusercontent.com/serviceHosts/5f1d2e93-1249-42e2-b16d-ed31c598c5d5/_apis/pipelines/1/runs/17499/signedlogcontent/36?urlExpires=2022-09-05T12%3A40%3A16.6214641Z&urlSigningMethod=HMACV1&urlSignature=pxCWdlIluLkqSV6lJ5ns5Fi9zSQKLy5Id%2FTDwE6xljY%3D
   
   
   
   Please check for dependency jar conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r964727140


##########
pom.xml:
##########
@@ -433,6 +434,12 @@
                 <version>${jwt.version}</version>
                 <scope>runtime</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.influxdb</groupId>
+                <artifactId>influxdb-java</artifactId>
+                <version>${influxdb.version}</version>
+            </dependency>

Review Comment:
   > Connector-specific dependencies are now managed under seatunnel-connectors-v2/connector-influxdb/pom.xml
   
   ok, i have fixed in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961797275


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
+    final InfluxDBConfig config;
+
+    private final Context<InfluxDBSourceSplit> context;
+    private Set<InfluxDBSourceSplit> pendingSplit;
+    private Set<InfluxDBSourceSplit> assignedSplit;
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBConfig config) {
+        this.context = context;
+        this.config = config;
+    }
+
+    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, InfluxDBConfig config) {
+        this(context, config);
+        this.assignedSplit = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
+    }
+
+    @Override
+    public void run() throws Exception {
+        pendingSplit = getInfluxDBSplit();
+        assignSplit(context.registeredReaders());
+    }
+
+    @Override
+    public void close() throws IOException {
+        //nothing to do
+    }
+
+    @Override
+    public void addSplitsBack(List splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        //nothing to do
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
+        }
+    }
+
+    @Override
+    public InfluxDBSourceState snapshotState(long checkpointId) throws Exception {
+        return new InfluxDBSourceState(assignedSplit);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        //nothing to do
+
+    }
+
+    private Set<InfluxDBSourceSplit> getInfluxDBSplit() {
+        String sql = config.getSql();
+        Set<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<>();
+        // no need numPartitions, use one partition
+        if (config.getPartitionNum() == 0) {
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql));
+            return influxDBSourceSplits;
+        }
+        long start = config.getLowerBound();
+        long end = config.getUpperBound();
+        int numPartitions = config.getPartitionNum();
+        String[] sqls = sql.split(SQL_WHERE);
+        if (sqls.length > 2) {
+            throw new IllegalArgumentException("sql should not contain more than one where");
+        }
+        int size = (int) (end - start) / numPartitions + 1;
+        int remainder = (int) ((end + 1 - start) % numPartitions);
+        if (end - start < numPartitions) {
+            numPartitions = (int) (end - start);
+        }
+        long currentStart = start;
+        int i = 0;
+        while (i < numPartitions) {
+            String query = " where (" + config.getSplitKey() + " >= " + currentStart + " and " + config.getSplitKey()  + " < " + (currentStart + size) + ") ";
+            i++;
+            currentStart += size;
+            if (i + 1 <= numPartitions) {
+                currentStart = currentStart - remainder;
+            }
+            query = sqls[0] + query;
+            if (sqls.length > 1) {
+                query = query + " and ( " + sqls[1] + " ) ";
+            }
+            influxDBSourceSplits.add(new InfluxDBSourceSplit(String.valueOf(i + System.nanoTime()), query));
+        }
+        return influxDBSourceSplits;
+    }

Review Comment:
   I had changed in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#issuecomment-1234415606

   E2E
   <img width="1085" alt="图片" src="https://user-images.githubusercontent.com/33744252/187948690-83847edf-782d-4ed6-8e3f-e5a3bfad012a.png">
   <img width="1323" alt="图片" src="https://user-images.githubusercontent.com/33744252/187948462-8b0b9f84-ad54-425b-8769-4efa17a80724.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961225005


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_FIELD_SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.QUERY_TAG_SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
+import static org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBTypeMapper.INFLUXDB_BIGINT;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit, InfluxDBSourceState>  {
+    private SeaTunnelRowType seaTunnelRowType;
+    private InfluxDBConfig influxDBConfig;
+
+    private static final String INFLUXDB_TIME_KEY = "time";
+    private static final String INFLUXDB_TIME_DATATYPE = INFLUXDB_BIGINT;
+
+    @Override
+    public String getPluginName() {
+        return "InfluxDB";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+        }
+        try {
+            this.influxDBConfig = new InfluxDBConfig(config);
+            InfluxDB influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
+            seaTunnelRowType = initTableField(influxDB, influxDBConfig);

Review Comment:
   hi, we need user config this. // 
   
           SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(pluginConfig);
           this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
   
   https://github.com/apache/incubator-seatunnel/issues/2469



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961798206


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/influxdb/influxdb_source_to_console.conf:
##########
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    InfluxDB {
+        url = "http://influxdb-host:8086"
+        sql = "select * from test"
+        database = "test"
+        upper_bound = 100
+        lower_bound = 1
+        partition_num = 4
+        split_column = "value"
+    }
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  Console {}

Review Comment:
   > Use `Assert` sink check datatype?
   
   hi,I had changed e2e in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r964412806


##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -0,0 +1,134 @@
+# InfluxDB
+
+> InfluxDB source connector
+
+## Description
+
+Read external data source data through InfluxDB.
+
+> Tips: To resolve the conflict between the influxdb connector and the spark jars, you need to
+replace the jar files of okhttp and okio in the spark jars directory to solve this problem.the command is as follows:
+>  ```
+> rm -f $SPARK_ HOME/jars/okhttp-*. jar $SPARK_ HOME/jars/okio-*.jar 
+> cp okhttp-4.9.0.jar okio-2.8.0.jar $SPARK_ HOME/jars
+>  ```
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name        | type    | required | default value |
+|-------------|---------|----------|---------------|
+| url         | string  | yes      | -             |
+| sql         | string  | yes      | -             |
+| fields                     | config  | yes      | -             |
+| database            | string  | yes      |               |
+| username    | string  | no       | -             |
+| password    | string  | no       | -             |
+| lower_bound | long    | no       | -             |
+| upper_bound | long    | no       | -             |
+| partition_num | int     | no       | -             |
+| split_column | string     | no       | -             |
+| epoch | string     | no       | n             |
+| connect_timeout_ms | long     | no       | 15000         |
+| query_timeout_sec | int     | no       | 3             |
+
+### url
+the url to connect to influxDB e.g.
+``` 
+http://influxdb-host:8086
+```
+
+### sql [string]
+The query sql used to search data
+
+```
+select name,age from test
+```
+
+### fields [string]
+
+the fields of the InfluxDB when you select
+
+the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+fields{
+    name=STRING
+    age=INT
+    }
+```
+
+### database [string]
+
+The `influxDB` database
+
+### username [string]
+
+the username of the influxDB when you select
+
+### password [string]
+
+the password of the influxDB when you select
+
+### split_column [string]
+
+the `split_column` of the influxDB when you select
+
+> Tips:
+> - influxDB tags is not supported as a segmented primary key because the type of tags can only be a string
+> - influxDB time is not supported as a segmented primary key because the time field cannot participate in mathematical calculation
+> - Currently, `split_column` only supports integer data segmentation, and does not support `float`, `string`, `date` and other types.
+
+### upper_bound [long]
+
+upper bound of the `split_column`column
+
+### lower_bound [long]
+
+lower bound of the `split_column` column
+
+```
+     split the $split_column range into $partition_num parts
+     if partition_num is 1, use the whole `split_column` range
+     if partition_num < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
+     
+     eg: lower_bound = 1, upper_bound = 10, partition_num = 2
+     sql = "select * from test where age > 0 and age < 10"
+     
+     split result
+
+     split 1: select * from test where ($split_column >= 1 and $split_column < 6)  and (  age > 0 and age < 10 )
+     
+     split 2: select * from test where ($split_column >= 6 and $split_column < 11) and (  age > 0 and age < 10 )
+
+```
+
+### partition_num [int]
+
+the `partition_num` of the InfluxDB when you select
+> Tips: Ensure that `upper_bound` minus `lower_bound` is divided `bypartition_num`, otherwise the query results will overlap
+
+### epoch [string]
+returned time precision
+- Optional values: H, m, s, MS, u, n
+- default value: n
+
+### query_timeout_sec [int]
+the `query_timeout` of the InfluxDB when you select, in seconds
+
+### connect_timeout_ms [long]
+the timeout for connecting to InfluxDB, in milliseconds 

Review Comment:
   > Please add some example for this connector. You can refer to the documentation of other connectors.
   
    Thank you for your suggestion, I have added in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r964614787


##########
pom.xml:
##########
@@ -433,6 +434,12 @@
                 <version>${jwt.version}</version>
                 <scope>runtime</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.influxdb</groupId>
+                <artifactId>influxdb-java</artifactId>
+                <version>${influxdb.version}</version>
+            </dependency>

Review Comment:
   Connector-specific dependencies are now managed under seatunnel-connectors-v2/connector-influxdb/pom.xml



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 closed pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 closed pull request #2595: [Feature][Connector-V2] Add influxDB connector source
URL: https://github.com/apache/incubator-seatunnel/pull/2595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r962483624


##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -0,0 +1,114 @@
+# InfluxDB

Review Comment:
   ok, I have modified it in new commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r964398768


##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -0,0 +1,134 @@
+# InfluxDB
+
+> InfluxDB source connector
+
+## Description
+
+Read external data source data through InfluxDB.
+
+> Tips: To resolve the conflict between the influxdb connector and the spark jars, you need to
+replace the jar files of okhttp and okio in the spark jars directory to solve this problem.the command is as follows:
+>  ```
+> rm -f $SPARK_ HOME/jars/okhttp-*. jar $SPARK_ HOME/jars/okio-*.jar 
+> cp okhttp-4.9.0.jar okio-2.8.0.jar $SPARK_ HOME/jars
+>  ```
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+
+## Options
+
+| name        | type    | required | default value |
+|-------------|---------|----------|---------------|
+| url         | string  | yes      | -             |
+| sql         | string  | yes      | -             |
+| fields                     | config  | yes      | -             |
+| database            | string  | yes      |               |
+| username    | string  | no       | -             |
+| password    | string  | no       | -             |
+| lower_bound | long    | no       | -             |
+| upper_bound | long    | no       | -             |
+| partition_num | int     | no       | -             |
+| split_column | string     | no       | -             |
+| epoch | string     | no       | n             |
+| connect_timeout_ms | long     | no       | 15000         |
+| query_timeout_sec | int     | no       | 3             |
+
+### url
+the url to connect to influxDB e.g.
+``` 
+http://influxdb-host:8086
+```
+
+### sql [string]
+The query sql used to search data
+
+```
+select name,age from test
+```
+
+### fields [string]
+
+the fields of the InfluxDB when you select
+
+the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
+
+e.g.
+
+```
+fields{
+    name=STRING
+    age=INT
+    }
+```
+
+### database [string]
+
+The `influxDB` database
+
+### username [string]
+
+the username of the influxDB when you select
+
+### password [string]
+
+the password of the influxDB when you select
+
+### split_column [string]
+
+the `split_column` of the influxDB when you select
+
+> Tips:
+> - influxDB tags is not supported as a segmented primary key because the type of tags can only be a string
+> - influxDB time is not supported as a segmented primary key because the time field cannot participate in mathematical calculation
+> - Currently, `split_column` only supports integer data segmentation, and does not support `float`, `string`, `date` and other types.
+
+### upper_bound [long]
+
+upper bound of the `split_column`column
+
+### lower_bound [long]
+
+lower bound of the `split_column` column
+
+```
+     split the $split_column range into $partition_num parts
+     if partition_num is 1, use the whole `split_column` range
+     if partition_num < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
+     
+     eg: lower_bound = 1, upper_bound = 10, partition_num = 2
+     sql = "select * from test where age > 0 and age < 10"
+     
+     split result
+
+     split 1: select * from test where ($split_column >= 1 and $split_column < 6)  and (  age > 0 and age < 10 )
+     
+     split 2: select * from test where ($split_column >= 6 and $split_column < 11) and (  age > 0 and age < 10 )
+
+```
+
+### partition_num [int]
+
+the `partition_num` of the InfluxDB when you select
+> Tips: Ensure that `upper_bound` minus `lower_bound` is divided `bypartition_num`, otherwise the query results will overlap
+
+### epoch [string]
+returned time precision
+- Optional values: H, m, s, MS, u, n
+- default value: n
+
+### query_timeout_sec [int]
+the `query_timeout` of the InfluxDB when you select, in seconds
+
+### connect_timeout_ms [long]
+the timeout for connecting to InfluxDB, in milliseconds 

Review Comment:
   Please add some example for this connector. You can refer to the documentation of other connectors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#issuecomment-1238914557

   > <img alt="image" width="1524" src="https://user-images.githubusercontent.com/14371345/188451711-792053b1-0b5b-4db2-a65b-6b628d55af10.png">
   > https://pipelines.actions.githubusercontent.com/serviceHosts/5f1d2e93-1249-42e2-b16d-ed31c598c5d5/_apis/pipelines/1/runs/17499/signedlogcontent/36?urlExpires=2022-09-05T12%3A40%3A16.6214641Z&urlSigningMethod=HMACV1&urlSignature=pxCWdlIluLkqSV6lJ5ns5Fi9zSQKLy5Id%2FTDwE6xljY%3D
   > 
   > Please check for dependency jar conflicts
    Thank you for your pr https://github.com/apache/incubator-seatunnel/pull/2648. I solved the conflict for dependency jar  in 
   flink e2e


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#issuecomment-1236603919

   @hailin0 @CalvinKirs @EricJoy2048  Hi brothers, please help me review. 
   <img width="1064" alt="图片" src="https://user-images.githubusercontent.com/33744252/188382565-3b89ce07-b175-4d47-99f7-89a6554f99dd.png">
   
    Integration test phase fails. How to solve this problem? My local E2E can pass 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961797493


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.client;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.HttpUrl;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.impl.InfluxDBImpl;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class InfluxDBClient {
+    private static final long CONNECT_TIMEOUT = 15000;
+    public static InfluxDB getInfluxDB(InfluxDBConfig config) throws ConnectException {
+        OkHttpClient.Builder clientBuilder =
+                new OkHttpClient.Builder()
+                        .connectTimeout(CONNECT_TIMEOUT, TimeUnit.MILLISECONDS)
+                        .readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);

Review Comment:
   > Do we need to make this field configurable?
   
   I had changed in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2595: [Feature][Connector-V2] Add influxDB connector source

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2595:
URL: https://github.com/apache/incubator-seatunnel/pull/2595#discussion_r961230462


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.client;
+
+import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.HttpUrl;
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.impl.InfluxDBImpl;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class InfluxDBClient {
+    private static final long CONNECT_TIMEOUT = 15000;
+    public static InfluxDB getInfluxDB(InfluxDBConfig config) throws ConnectException {
+        OkHttpClient.Builder clientBuilder =
+                new OkHttpClient.Builder()
+                        .connectTimeout(CONNECT_TIMEOUT, TimeUnit.MILLISECONDS)
+                        .readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);

Review Comment:
   Do we need to make this field configurable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org