You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2021/03/10 19:56:29 UTC

[GitHub] [bahir-flink] 1p4pk opened a new pull request #114: InfluxDBv2.0 Connector

1p4pk opened a new pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r593939633



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);

Review comment:
       I took rather a different approach here. I created all the mandatory fields as properties and assigned them with `null` in the constructor. Whenever the `build()` is called I check all the fields for not being null. WDYT?
   
   ```java
       private void sanityCheck() {
           // Check required settings.
           checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
           checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided.");
           checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided.");
           checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
           checkNotNull(this.organizationName, "The Organization name is required but not provided.");
           checkNotNull( this.influxDBSchemaSerializer, "Serialization schema is required but not provided.");
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r593939633



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);

Review comment:
       I took rather a different approach here. I created all the mandatory fields as properties and assigned them with `null` in the constructor. Whenever the `build()` is called I check all the fields for not being null. WDYT?
   
   ```java
       private void sanityCheck() {
           // Check required settings.
           checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
           checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided.");
           checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided.");
           checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
           checkNotNull(this.organizationName, "The Organization name is required but not provided.");
           checkNotNull(this.influxDBSchemaSerializer, "Serialization schema is required but not provided.");
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r604271386



##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>

Review comment:
       this should be 2.11
   
   <artifactId>flink-connector-influxdb2_2.11</artifactId>

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <spotless.version>2.7.0</spotless.version>
+    <druid.version>0.13.0-incubating</druid.version>
+    <!--  Test Properties  -->
+    <testcontainers.version>1.15.2</testcontainers.version>
+    <hamcrest.version>2.2</hamcrest.version>

Review comment:
       this is not used

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <spotless.version>2.7.0</spotless.version>

Review comment:
       this could be removed

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <h3>Elements of line protocol</h3>
+ *
+ * <pre>
+ *
+ * measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
+ * --------------- --------------- --------------------- -------------------
+ *      |               |                   |                     |
+ * Measurement       Tag set            Field set              Timestamp
+ *
+ * </pre>
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+
+    private final String measurement;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    private final Long timestamp;
+
+    DataPoint(final String measurementName, @Nullable final Long timestamp) {
+        Arguments.checkNotNull(measurementName, "measurement");

Review comment:
       you are using Arguments.checkNotNull
   this could be Preconditions.checkNotNull(measurementName, "measurement")   

##########
File path: pom.xml
##########
@@ -75,6 +75,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-connector-influxdb2</module>

Review comment:
       should we maintain the 1.7 version? its 10 versions down from last one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r606789162



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <h3>Elements of line protocol</h3>
+ *
+ * <pre>
+ *
+ * measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
+ * --------------- --------------- --------------------- -------------------
+ *      |               |                   |                     |
+ * Measurement       Tag set            Field set              Timestamp
+ *
+ * </pre>
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+
+    private final String measurement;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    private final Long timestamp;
+
+    DataPoint(final String measurementName, @Nullable final Long timestamp) {
+        Arguments.checkNotNull(measurementName, "measurement");

Review comment:
       Yes, but it does not make sense to use both Arguments.xx and Precondtions.xxx. In line 91 for example we use Arguments.checkNonEmpty




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r616390584



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */

Review comment:
       I added the above code. Unfortunately, we cannot make this clear distinction as we had to adapt a lot of the code, so we cannot really find a clear line to add the second comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] AHeise commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r597121114



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <h3>Elements of line protocol</h3>
+ *
+ * <pre>
+ *
+ * measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
+ * --------------- --------------- --------------------- -------------------
+ *      |               |                   |                     |
+ * Measurement       Tag set            Field set              Timestamp
+ *
+ * </pre>
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+
+    private final String measurement;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    private final Long timestamp;
+
+    DataPoint(final String measurementName, @Nullable final Long timestamp) {
+        Arguments.checkNotNull(measurementName, "measurement");

Review comment:
       Maybe rather use Flink's `Precondition` like in Source?

##########
File path: flink-connector-influxdb2/src/main/resources/log4j2.properties
##########
@@ -0,0 +1,31 @@
+#

Review comment:
       Please remove this file entirely. It can easily overwrite the user's configuration during shading and confuse the heck out of them.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <h3>Elements of line protocol</h3>
+ *
+ * <pre>
+ *
+ * measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
+ * --------------- --------------- --------------------- -------------------
+ *      |               |                   |                     |
+ * Measurement       Tag set            Field set              Timestamp
+ *
+ * </pre>
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+
+    private final String measurement;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    private final Long timestamp;
+
+    DataPoint(final String measurementName, @Nullable final Long timestamp) {
+        Arguments.checkNotNull(measurementName, "measurement");
+        this.measurement = measurementName;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Converts the DataPoint object to {@link Point} object.
+     *
+     * @return {@link Point}.
+     */
+    public Point toPoint() {
+        final Point point = new Point(this.measurement);
+        point.time(this.timestamp, WritePrecision.NS);

Review comment:
       Afaik, timestamp is in MS. Probably worth documenting if it's confusing you/me.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles the incoming requests through the path /api/v2/write. The handle function
+ * reads each line in the body and uses the {@link InfluxParser} to pars them to {@link DataPoint}
+ * objects.
+ */
+@Internal
+public final class WriteAPIHandler extends Handler {
+    private static final Logger LOG = LoggerFactory.getLogger(WriteAPIHandler.class);
+
+    private final int maximumLinesPerRequest;
+    private final FutureCompletingBlockingQueue ingestionQueue;
+    private final int threadIndex;
+    private final long enqueueWaitTime;
+
+    public WriteAPIHandler(
+            final int maximumLinesPerRequest,
+            final FutureCompletingBlockingQueue ingestionQueue,
+            final int threadIndex,
+            final long enqueueWaitTime) {
+        this.maximumLinesPerRequest = maximumLinesPerRequest;
+        this.ingestionQueue = ingestionQueue;
+        this.threadIndex = threadIndex;
+        this.enqueueWaitTime = enqueueWaitTime;
+    }
+
+    @Override
+    public void handle(final HttpExchange t) throws IOException {
+        final BufferedReader in =
+                new BufferedReader(
+                        new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));
+
+        try {
+            String line;
+            final List<DataPoint> points = new ArrayList<>();
+            int numberOfLinesParsed = 0;
+            while ((line = in.readLine()) != null) {
+                final DataPoint dataPoint = InfluxParser.parseToDataPoint(line);
+                points.add(dataPoint);
+                numberOfLinesParsed++;
+                if (numberOfLinesParsed > this.maximumLinesPerRequest) {
+                    throw new RequestTooLargeException(
+                            String.format(
+                                    "Payload too large. Maximum number of lines per request is %d.",
+                                    this.maximumLinesPerRequest));
+                }
+            }
+
+            final boolean result =
+                    CompletableFuture.supplyAsync(
+                                    () -> {
+                                        try {
+                                            return this.ingestionQueue.put(
+                                                    this.threadIndex, points);
+                                        } catch (final InterruptedException e) {
+                                            return false;
+                                        }
+                                    })
+                            .get(this.enqueueWaitTime, TimeUnit.SECONDS);
+
+            if (!result) {
+                throw new TimeoutException("Failed to enqueue");
+            }
+
+            t.sendResponseHeaders(HttpURLConnection.HTTP_NO_CONTENT, -1);
+            this.ingestionQueue.notifyAvailable();
+        } catch (final ParseException e) {
+            Handler.sendResponse(t, HttpURLConnection.HTTP_BAD_REQUEST, e.getMessage());
+        } catch (final RequestTooLargeException e) {
+            Handler.sendResponse(t, HttpURLConnection.HTTP_ENTITY_TOO_LARGE, e.getMessage());
+        } catch (final TimeoutException e) {
+            final int HTTP_TOO_MANY_REQUESTS = 429;

Review comment:
       Extract as proper constant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r594090299



##########
File path: pom.xml
##########
@@ -93,7 +94,7 @@
 
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-    <log4j2.version>2.13.3</log4j2.version>
+    <log4j2.version>2.12.1</log4j2.version>

Review comment:
       @eskabetxe I fixed the problem, and there is no need for the version downgrade :) new commit on its way! :rocket: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk edited a comment on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk edited a comment on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838304215


   @rmetzger squashed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-814852318


   I don't think we can add the presentation in `flink-connector-influxdb2/media/benchmarks.pdf` to bahir.
   Effectively, we are releasing the pdf under the ASL 2.0, but I'm pretty sure that the pdf contains pictures which we haven't created outselves.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-845753611


   @rmetzger sry for all the continuous pushes... there were a bunch of conflicts and branch hell going on.. everything should be in place and correct now!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] bkahloon edited a comment on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
bkahloon edited a comment on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-822853231


   Hi @1p4pk , @Shark,  @raminqaf 
   
   I was just wondering if the source connector would be usable via the Flink SQL api. If there isn't a direct integration available with this PR, what work do you suspect would be needed. 
   
   Thank you for your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r591836255



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Committer;
+
+/**
+ * The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
+ * whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
+ * point in InfluxDB. The checkpoint data point uses the latest written record timestamp.
+ */
+@Slf4j
+public final class InfluxDBCommitter implements Committer<Long> {
+
+    private final InfluxDBClient influxDBClient;
+    private final boolean writeCheckpoint;
+
+    public InfluxDBCommitter(final Properties properties) {
+        this.influxDBClient = getInfluxDBClient(properties);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+    }
+
+    /**
+     * This method is called only when a checkpoint is set and writes a checkpoint data point into
+     * InfluxDB. The {@link
+     * org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter} prepares the
+     * commit and fills the commitable list with the latest timestamp. If the list contains a single
+     * element it will be used as the timestamp of the datapoint. Otherwise when no timestamp is
+     * provided, InfluxDB will use the current timestamp (UTC) of the host machine.
+     *
+     * <p>
+     *
+     * @param committables Contains the latest written timestamp.
+     * @return Empty list
+     * @see <a
+     *     href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#timestamp></a>
+     */
+    @SneakyThrows
+    @Override
+    public List<Long> commit(final List<Long> committables) {
+        if (this.writeCheckpoint) {
+            log.debug("A checkpoint is set.");
+            Optional<Long> lastTimestamp = Optional.empty();
+            if (committables.size() >= 1) {
+                lastTimestamp = Optional.ofNullable(committables.get(committables.size() - 1));
+            }
+            this.writeCheckpointDataPoint(lastTimestamp);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void close() {
+        this.influxDBClient.close();
+        log.debug("Closing the committer.");
+    }
+
+    private void writeCheckpointDataPoint(final Optional<Long> timestamp) {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
+            final Point point = new Point("checkpoint");
+            point.addField("checkpoint", "flink");
+            timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.NS));
+            writeApi.writePoint(point);
+            log.debug("Checkpoint data point write at {}", point.toLineProtocol());
+        }
+    }

Review comment:
       @Aheise here we are opening a connection to InfluxDB and use the try with resources statement since the `WriteApi` interface extends the `AutoCloseable`. The statement doesn't catch any exceptions yet. We were wondering how to catch the exceptions and how to handle them correctly?
   Here is a list of InfluxDB-Client-Java [exceptions](https://github.com/influxdata/influxdb-client-java/tree/master/client-core/src/main/java/com/influxdb/exceptions).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-845027396


   Thanks a lot for the hotfix. Could you rebase to the latest master to see if the build now passes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-823026663


   @AHeise we added the last suggestions, but I could only re-request a review from one person. Can you have a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r623800539



##########
File path: flink-connector-influxdb2/license-header
##########
@@ -0,0 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review comment:
       Can be deleted. Was used for spotless::apply.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf edited a comment on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf edited a comment on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838786735


   @rmetzger Everything should be ready and set for the merge now! :rocket:  only one small thing:
   I noticed that the Travis CI Pipeline is failing (for both environments, scala 2.11 & 2.12) due to three tests in the `flume` connector. Here is a screenshot of the Travis logs:
   ![image](https://user-images.githubusercontent.com/20357405/117848190-561af800-b283-11eb-9bde-e4946202793f.png)
   I found a quick fix for it: I just updated to the newest version of `testcontainers`, and it seams that this fixes the problem. I created a [PR](https://github.com/apache/bahir-flink/pull/123) for this issue. The pipeline passes successfully. I didn't know how to create an issue on Jira but maybe you can help me out on that :) Also, I couldn't add anyone to the reviewers on my PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r616387907



##########
File path: pom.xml
##########
@@ -75,6 +75,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-connector-influxdb2</module>

Review comment:
       I guess yes, because we implemented influxDB 2.X which is still quite new. I guess a lot of people still use the old api.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r593939004



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {
+        return this.setProperty(WRITE_DATA_POINT_CHECKPOINT.key(), String.valueOf(shouldWrite));
+    }
+
+    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {

Review comment:
       I did the check and add some tests for the Builder class.
   Should we describe the trade-off in the README or here as JavaDoc?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838786735


   @rmetzger Everything should be ready and set for the merge now! :rocket:  only one small thing:
   I noticed that the Travis CI Pipeline is failing (for both environments, scala 2.11 & 2.12) due to three tests in the `flume` connector. Here a screenshot of the Travis logs:
   ![image](https://user-images.githubusercontent.com/20357405/117848190-561af800-b283-11eb-9bde-e4946202793f.png)
   I dug more inside and found a quick fix for it: I just updated to the newest version of `testcontainers`, and the tests are running locally. I created a [PR](https://github.com/apache/bahir-flink/pull/123) for this issue. The pipeline there passes. I didn't know how to create an issue on Jira but maybe you can help me out on that :) Also, I couldn't add anyone to the reviewers on my PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] AHeise commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
AHeise commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838326951


   Probably easiest to (re)start from the latest unsquashed commit `03219bf55f6de444dff62422f29062b7a9c1451c`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r593945598



##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |

Review comment:
       Here the issue:
   https://github.com/apache/druid/issues/10993




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-831049180


   Thanks for the additional approval @eskabetxe. I'll merge this now!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] AHeise commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
AHeise commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r592999342



##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).

Review comment:
       Add CDC to description.

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).

Review comment:
       `link` -> `shade`

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |

Review comment:
       👍 

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.

Review comment:
       Not sure if users know what a SplitReader is. How about replacing it with source instance?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)

Review comment:
       👍 

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |
+|    String     | ✅            |
+|    Boolean    | ✅            |
+
+See InfluxDB field set value [data type](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set).
+ 
+
+## Sink
+
+The Sink writes data points to InfluxDB using the [InfluxDB Java Client](https://github.com/influxdata/influxdb-client-java). You provide the connection information (URL, username, password, bucket, and organization) and an implementation of `InfluxDBSchemaSerializer<IN>` generic interface. The implementation of the interface overrides the `serialize(IN element, Context context)` function. This function serializes incoming Flink elements of type `IN` to [Point](https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/com/influxdb/client/write/Point.java) objects.
+
+It is possible to write multiple data points to InfluxDB simultaneously by separating each point with a new line. Batching data points in this manner results in much higher performance. The batch size can be set through the `WRITE_BUFFER_SIZE` option. By default, the buffer size is set to 1000 and can be changed to any value using the `setWriteBufferSize(final int bufferSize)` of the Sink builder class.
+
+It is possible to write checkpoint data points to InfluxDB whenever Flink sets a checkpoint. To enable this functionality, you need to set the `WRITE_DATA_POINT_CHECKPOINT` flag to true (default is false). The checkpoint data point looks as follow:
+```text
+checkpoint checkpoint=flink <timestamp>
+```
+The timestamp refers to the latest element that Flink serializes.
+
+### Usage
+
+```java=
+// The InfluxDB Sink uses the build pattern to create a Sink object
+InfluxDBSink<Long> influxDBSink = InfluxDBSink.<Long>builder()

Review comment:
       Similar idea: leave `builder()` untyped?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()

Review comment:
       Maybe builder should be untyped (`?`) until the user sets the deserializer?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |
+|    String     | ✅            |
+|    Boolean    | ✅            |
+
+See InfluxDB field set value [data type](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set).
+ 
+
+## Sink
+
+The Sink writes data points to InfluxDB using the [InfluxDB Java Client](https://github.com/influxdata/influxdb-client-java). You provide the connection information (URL, username, password, bucket, and organization) and an implementation of `InfluxDBSchemaSerializer<IN>` generic interface. The implementation of the interface overrides the `serialize(IN element, Context context)` function. This function serializes incoming Flink elements of type `IN` to [Point](https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/com/influxdb/client/write/Point.java) objects.
+
+It is possible to write multiple data points to InfluxDB simultaneously by separating each point with a new line. Batching data points in this manner results in much higher performance. The batch size can be set through the `WRITE_BUFFER_SIZE` option. By default, the buffer size is set to 1000 and can be changed to any value using the `setWriteBufferSize(final int bufferSize)` of the Sink builder class.
+
+It is possible to write checkpoint data points to InfluxDB whenever Flink sets a checkpoint. To enable this functionality, you need to set the `WRITE_DATA_POINT_CHECKPOINT` flag to true (default is false). The checkpoint data point looks as follow:
+```text
+checkpoint checkpoint=flink <timestamp>
+```
+The timestamp refers to the latest element that Flink serializes.
+
+### Usage
+
+```java=
+// The InfluxDB Sink uses the build pattern to create a Sink object
+InfluxDBSink<Long> influxDBSink = InfluxDBSink.<Long>builder()
+        .setInfluxDBSchemaSerializer(new TestSerializer())
+        .setInfluxDBUrl(getUrl())           // http://localhost:8086
+        .setInfluxDBUsername(getUsername()) // admin
+        .setInfluxDBPassword(getPassword()) // admin
+        .setInfluxDBBucket(getBucket())     // default
+        .setInfluxDBOrganization(getOrg())  // influxdata
+        .build();
+        
+// ...
+
+/**
+ * Implementation of InfluxDBSchemaSerializer interface
+ * (element) -----> (dataPoint)
+ *  1L -----------> test,longValue=1 fieldKey="fieldValue"
+ *  2L -----------> test,longValue=2 fieldKey="fieldValue"
+ *  3L -----------> test,longValue=3 fieldKey="fieldValue"           
+ */
+class TestSerializer implements InfluxDBSchemaSerializer<Long> {
+
+    @Override
+    public Point serialize(Long element, Context context) {
+        final Point dataPoint = new Point("test");
+        dataPoint.addTag("longValue", String.valueOf(element));
+        dataPoint.addField("fieldKey", "fieldValue");
+        return dataPoint;
+    }
+}
+```
+
+### Options
+
+| Option            | Description   | Default Value   |
+| ----------------- |-----------------|:-----------------:|
+| WRITE_DATA_POINT_CHECKPOINT | Determines if the checkpoint data point should be written to InfluxDB or not. | false |
+| WRITE_BUFFER_SIZE | Number of elements to buffer the data before writing them to InfluxDB. | 1000 |
+| INFLUXDB_URL | InfluxDB Connection URL. | ❌ |
+| INFLUXDB_USERNAME | InfluxDB username. | ❌ |
+| INFLUXDB_PASSWORD | InfluxDB password. | ❌ |
+| INFLUXDB_BUCKET | InfluxDB bucket. | ❌ |
+| INFLUXDB_ORGANIZATION | InfluxDB organization. | ❌ |

Review comment:
       :+1:

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {

Review comment:
       `Number`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import lombok.Getter;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+    @Getter private final String name;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    @Getter private final Number timestamp;

Review comment:
       `Long`?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);
+    }
+
+    private Object parseBool(final String raw) {

Review comment:
       Should probably by `Boolean`.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {

Review comment:
       param name c&p issue.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {

Review comment:
       This deserves a proper documentation and should probably also be an additional section in the readme.md.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {
+        return this.setProperty(WRITE_DATA_POINT_CHECKPOINT.key(), String.valueOf(shouldWrite));
+    }
+
+    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {
+        return this.setProperty(WRITE_BUFFER_SIZE.key(), String.valueOf(bufferSize));
+    }
+
+    public InfluxDBSink<IN> build() {
+        this.sanityCheck();
+        return new InfluxDBSink<>(this.influxDBSchemaSerializer, this.properties);
+    }
+
+    // ------------- private helpers  --------------
+    /**
+     * Set an arbitrary property for the InfluxDBSink. The valid keys can be found in {@link
+     * InfluxDBSinkOptions}.
+     *
+     * @param key the key of the property.
+     * @param value the value of the property.
+     * @return this InfluxDBSinkBuilder.
+     */
+    private InfluxDBSinkBuilder<IN> setProperty(final String key, final String value) {
+        this.properties.setProperty(key, value);
+        return this;
+    }
+
+    /** Checks if the SchemaSerializer and the influxDBConfig are not null and set. */
+    private void sanityCheck() {
+        // Check required settings.
+        checkNotNull(

Review comment:
       + other mandatory fields.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxDBOptionsBase.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.util.Properties;
+import java.util.function.Function;
+import org.apache.flink.configuration.ConfigOption;
+
+public abstract class InfluxDBOptionsBase {

Review comment:
       Base class seems to be unnecessary. How about replacing `Properties` with `org.apache.flink.configuration.Configuration` that would give you the type-safety that you desire?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {
+        return this.setProperty(WRITE_DATA_POINT_CHECKPOINT.key(), String.valueOf(shouldWrite));
+    }
+
+    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {

Review comment:
       check bufferSize > 0. describe trade-off latency <-> throughput?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * This class Serialize and deserializes the commit values. Since we are sending the timestamp value
+ * as a committable the Long object is (de)serialized.
+ */
+public final class InfluxDBCommittableSerializer implements SimpleVersionedSerializer<Long> {
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(final Long value) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(0, value);
+        return buffer.array();
+    }
+
+    @Override
+    public Long deserialize(final int version, final byte[] serialized) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);

Review comment:
       `ByteBuffer.wrap`?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Committer;
+
+/**
+ * The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
+ * whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
+ * point in InfluxDB. The checkpoint data point uses the latest written record timestamp.
+ */
+@Slf4j
+public final class InfluxDBCommitter implements Committer<Long> {
+
+    private final InfluxDBClient influxDBClient;
+    private final boolean writeCheckpoint;
+
+    public InfluxDBCommitter(final Properties properties) {
+        this.influxDBClient = getInfluxDBClient(properties);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+    }
+
+    /**
+     * This method is called only when a checkpoint is set and writes a checkpoint data point into
+     * InfluxDB. The {@link
+     * org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter} prepares the
+     * commit and fills the commitable list with the latest timestamp. If the list contains a single
+     * element it will be used as the timestamp of the datapoint. Otherwise when no timestamp is
+     * provided, InfluxDB will use the current timestamp (UTC) of the host machine.
+     *
+     * <p>
+     *
+     * @param committables Contains the latest written timestamp.
+     * @return Empty list
+     * @see <a
+     *     href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#timestamp></a>
+     */
+    @SneakyThrows

Review comment:
       translate any checked exception (are there any?) into `IOException` and use explicit signature.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and feels it
+     * up with the latest timestamp.
+     *
+     * @param flush
+     * @return A list containing 0 or 1 element
+     */
+    @Override
+    public List<Long> prepareCommit(final boolean flush) {
+        if (this.lastTimestamp == 0) {
+            return Collections.emptyList();
+        }
+        final List<Long> lastTimestamp = new ArrayList<>(1);
+        lastTimestamp.add(this.lastTimestamp);
+        return lastTimestamp;
+    }
+
+    @Override
+    public List<Point> snapshotState() {
+        return this.elements;
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.debug("Preparing to write the elements in close.");
+        this.writeCurrentElements();
+        log.debug("Closing the writer.");
+        this.elements.clear();
+    }
+
+    public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
+        this.processingTimerService = processingTimerService;
+    }
+
+    private void writeCurrentElements() throws Exception {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {

Review comment:
       Did you verify that this is cheap enough to do on each batch? I guess it has to be as this is the only way to do batch writes, right?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());

Review comment:
       Remove or `trace`.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and feels it

Review comment:
       fills?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and feels it
+     * up with the latest timestamp.
+     *
+     * @param flush
+     * @return A list containing 0 or 1 element
+     */
+    @Override
+    public List<Long> prepareCommit(final boolean flush) {
+        if (this.lastTimestamp == 0) {
+            return Collections.emptyList();
+        }
+        final List<Long> lastTimestamp = new ArrayList<>(1);
+        lastTimestamp.add(this.lastTimestamp);
+        return lastTimestamp;
+    }
+
+    @Override
+    public List<Point> snapshotState() {
+        return this.elements;
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.debug("Preparing to write the elements in close.");
+        this.writeCurrentElements();
+        log.debug("Closing the writer.");

Review comment:
       Where? ;)

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumState;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumStateSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSplitEnumerator;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBRecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSourceReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSplitReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplitSerializer;
+
+/**
+ * The Source implementation of InfluxDB. Please use a {@link InfluxDBSourceBuilder} to construct a
+ * {@link InfluxDBSource}. The following example shows how to create an InfluxDBSource emitting
+ * records of <code>String</code> type.
+ *
+ * <p>See {@link InfluxDBSourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ */
+public final class InfluxDBSource<OUT>
+        implements Source<OUT, InfluxDBSplit, InfluxDBSourceEnumState>, ResultTypeQueryable<OUT> {
+
+    private final Properties properties;
+    private final InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+
+    InfluxDBSource(
+            final Properties properties,
+            final InfluxDBDataPointDeserializer<OUT> deserializationSchema) {
+        this.properties = properties;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    /**
+     * Get a influxDBSourceBuilder to build a {@link InfluxDBSource}.
+     *
+     * @return a InfluxDB source builder.
+     */
+    public static <OUT> InfluxDBSourceBuilder<OUT> builder() {
+        return new InfluxDBSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, InfluxDBSplit> createReader(
+            final SourceReaderContext sourceReaderContext) {
+        final Supplier<InfluxDBSplitReader> splitReaderSupplier =
+                () -> new InfluxDBSplitReader(this.properties);
+        final InfluxDBRecordEmitter<OUT> recordEmitter =
+                new InfluxDBRecordEmitter<>(this.deserializationSchema);
+
+        return new InfluxDBSourceReader<>(
+                splitReaderSupplier,
+                recordEmitter,
+                this.toConfiguration(this.properties),
+                sourceReaderContext);
+    }
+
+    @Override
+    public SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> createEnumerator(
+            final SplitEnumeratorContext<InfluxDBSplit> splitEnumeratorContext) {
+        return new InfluxDBSplitEnumerator(splitEnumeratorContext);
+    }
+
+    @Override
+    public SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> restoreEnumerator(
+            final SplitEnumeratorContext<InfluxDBSplit> splitEnumeratorContext,
+            final InfluxDBSourceEnumState influxDBSourceEnumState) {
+        return null;

Review comment:
       This needs to be implemented. Since you don't have any state, you could just use `new InfluxDBSplitEnumerator(splitEnumeratorContext)`. 

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceOptions.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import java.util.Properties;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxDBOptionsBase;
+
+/* Configurations for a InfluxDBSource. */
+public final class InfluxDBSourceOptions extends InfluxDBOptionsBase {
+
+    private InfluxDBSourceOptions() {}
+
+    public static final ConfigOption<Long> ENQUEUE_WAIT_TIME =
+            ConfigOptions.key("source.influxDB.timeout.enqueue")
+                    .longType()
+                    .defaultValue(5L)
+                    .withDescription(
+                            "The time out in seconds for enqueuing an HTTP request to the queue.");
+
+    public static final ConfigOption<Integer> INGEST_QUEUE_CAPACITY =
+            ConfigOptions.key("source.influxDB.queue_capacity.ingest")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Size of queue that buffers HTTP requests data points before fetching.");
+
+    public static final ConfigOption<Integer> MAXIMUM_LINES_PER_REQUEST =
+            ConfigOptions.key("source.influxDB.limit.lines_per_request")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The maximum number of lines that should be parsed per HTTP request.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("source.influxDB.port")
+                    .intType()
+                    .defaultValue(8000)
+                    .withDescription(
+                            "TCP port on which the split reader's HTTP server is running on.");
+
+    public static long getEnqueueWaitTime(final Properties properties) {
+        return getOption(properties, ENQUEUE_WAIT_TIME, Long::parseLong);
+    }
+
+    public static int getIngestQueueCapacity(final Properties properties) {
+        return getOption(properties, INGEST_QUEUE_CAPACITY, Integer::parseInt);
+    }
+
+    public static int getMaximumLinesPerRequest(final Properties properties) {
+        return getOption(properties, MAXIMUM_LINES_PER_REQUEST, Integer::parseInt);
+    }
+
+    public static int getPort(final Properties properties) {
+        return getOption(properties, PORT, Integer::parseInt);
+    }

Review comment:
       Could be removed if you switch to Configuration.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumStateSerializer.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/** InfluxDB is stateless due to its unreplayable HTTP request source. */

Review comment:
       This needs to be documented in readme md + source as well.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.jetbrains.annotations.Nullable;
+
+/** The enumerator class for InfluxDB source. */
+@Internal
+public final class InfluxDBSplitEnumerator
+        implements SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> {
+
+    private final SplitEnumeratorContext<InfluxDBSplit> context;
+
+    public InfluxDBSplitEnumerator(final SplitEnumeratorContext<InfluxDBSplit> context) {
+        this.context = checkNotNull(context);
+    }
+
+    @Override
+    public void start() {
+        // no resources to start
+    }
+
+    @Override
+    public void handleSplitRequest(final int subtaskId, @Nullable final String requesterHostname) {
+        this.context.assignSplit(new InfluxDBSplit(subtaskId), subtaskId);
+    }
+
+    @Override
+    public void addSplitsBack(final List<InfluxDBSplit> splits, final int subtaskId) {}
+
+    @Override
+    public void addReader(final int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon registration
+    }
+
+    @Override
+    public InfluxDBSourceEnumState snapshotState() throws Exception {
+        return null;

Review comment:
       return the empty state

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.jetbrains.annotations.NotNull;
+
+abstract class Handler implements HttpHandler {

Review comment:
       Quick doc

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);
+    }
+
+    private Object parseBool(final String raw) {
+        final char first = raw.charAt(0);
+        if (first == 't' || first == 'T') {
+            return "true";
+        } else {
+            return "false";
+        }
+    }
+
+    private String parseIdentifier(final InfluxLineProtocolParser.IdentifierContext ctx) {
+        if (ctx.BOOLEAN() != null || ctx.NUMBER() != null) {
+            return ctx.getText();
+        }
+
+        return IDENTIFIER_PATTERN.matcher(ctx.IDENTIFIER_STRING().getText()).replaceAll("$1");
+    }
+
+    private Number parseTimestamp(@Nullable final TimestampContext timestamp) {

Review comment:
       Long?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |

Review comment:
       UInt limitation comes directly from Druid right? Could you check if there is a ticket and link it?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);

Review comment:
       This seems to be rather costly for each line. Could you please double-check if there is a way to reuse the parser with existing Druid parser?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.IOException;
+
+public final class HealthCheckHandler extends Handler {

Review comment:
       Quick doc

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
+@Slf4j
+public final class WriteAPIHandler extends Handler {
+    private final InfluxParser parser = new InfluxParser();
+    private final int maximumLinesPerRequest;
+    private final FutureCompletingBlockingQueue ingestionQueue;
+    private final int threadIndex;
+    private final long enqueueWaitTime;
+
+    public WriteAPIHandler(
+            final int maximumLinesPerRequest,
+            final FutureCompletingBlockingQueue ingestionQueue,
+            final int threadIndex,
+            final long enqueueWaitTime) {
+        this.maximumLinesPerRequest = maximumLinesPerRequest;
+        this.ingestionQueue = ingestionQueue;
+        this.threadIndex = threadIndex;
+        this.enqueueWaitTime = enqueueWaitTime;
+    }
+
+    @Override
+    public void handle(final HttpExchange t) throws IOException {
+        final BufferedReader in =
+                new BufferedReader(
+                        new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));
+
+        try {
+            String line;
+            final List<DataPoint> points = new ArrayList<>();
+            int numberOfLinesParsed = 0;
+            while ((line = in.readLine()) != null) {
+                final DataPoint dataPoint = this.parser.parseToDataPoint(line);
+                points.add(dataPoint);
+                numberOfLinesParsed++;
+                if (numberOfLinesParsed > this.maximumLinesPerRequest) {
+                    throw new RequestTooLargeException(
+                            String.format(
+                                    "Payload too large. Maximum number of lines per request is %d.",
+                                    this.maximumLinesPerRequest));
+                }
+            }

Review comment:
       This part is quite confusing to me and probably inefficient:
   - You are splitting the request into lines.
   - Then you are creating a parser for each line and parse the line.
   - You are creating quite a bit of intermediate objects.
   
   Wouldn't it be possible to just parse the whole input stream through antlr? You'd create the parser once per batch and have no intermediate representations.
   
   You can probably leverage the `Stream` API to parse the lines as you go to not go over your queue limit.
   

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {

Review comment:
       `String`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import lombok.Getter;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.

Review comment:
       I'd add an example/pattern of a datapoint here.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import com.influxdb.client.write.Point;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.Getter;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.commiter.InfluxDBCommittableSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.commiter.InfluxDBCommitter;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBPointSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+
+public final class InfluxDBSink<IN> implements Sink<IN, Long, Point, Void> {

Review comment:
       Add docs + usage example (recycle from readme).

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.split;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * The {@link org.apache.flink.core.io.SimpleVersionedSerializer serializer} for {@link
+ * InfluxDBSplit}.
+ */
+public final class InfluxDBSplitSerializer implements SimpleVersionedSerializer<InfluxDBSplit> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(final InfluxDBSplit influxDBSplit) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(0, influxDBSplit.getId());
+        return buffer.array();
+    }
+
+    @Override
+    public InfluxDBSplit deserialize(final int version, final byte[] serialized) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);

Review comment:
       ByteBuffer.wrap

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);

Review comment:
       `Double.valueOf`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer;
+
+import java.io.Serializable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+
+/** An interface for the deserialization of InfluxDB data points. */
+public interface InfluxDBDataPointDeserializer<OUT> extends Serializable, ResultTypeQueryable<OUT> {
+
+    /**
+     * Deserialize a data point into the given collector.
+     *
+     * @param dataPoint the {@code DataPoint} to deserialize.
+     * @throws Exception if the deserialization failed.
+     */
+    OUT deserialize(DataPoint dataPoint) throws Exception;

Review comment:
       IOException?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
+
+import com.sun.net.httpserver.HttpServer;

Review comment:
       Please check if there is an option that is not coming from `com.sun.net` or verify that this is portable.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
+@Slf4j
+public final class WriteAPIHandler extends Handler {
+    private final InfluxParser parser = new InfluxParser();
+    private final int maximumLinesPerRequest;
+    private final FutureCompletingBlockingQueue ingestionQueue;
+    private final int threadIndex;
+    private final long enqueueWaitTime;
+
+    public WriteAPIHandler(
+            final int maximumLinesPerRequest,
+            final FutureCompletingBlockingQueue ingestionQueue,
+            final int threadIndex,
+            final long enqueueWaitTime) {
+        this.maximumLinesPerRequest = maximumLinesPerRequest;
+        this.ingestionQueue = ingestionQueue;
+        this.threadIndex = threadIndex;
+        this.enqueueWaitTime = enqueueWaitTime;
+    }
+
+    @Override
+    public void handle(final HttpExchange t) throws IOException {
+        final BufferedReader in =
+                new BufferedReader(
+                        new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));
+
+        try {
+            String line;
+            final List<DataPoint> points = new ArrayList<>();
+            int numberOfLinesParsed = 0;
+            while ((line = in.readLine()) != null) {
+                final DataPoint dataPoint = this.parser.parseToDataPoint(line);
+                points.add(dataPoint);
+                numberOfLinesParsed++;
+                if (numberOfLinesParsed > this.maximumLinesPerRequest) {
+                    throw new RequestTooLargeException(
+                            String.format(
+                                    "Payload too large. Maximum number of lines per request is %d.",
+                                    this.maximumLinesPerRequest));
+                }
+            }
+
+            final boolean result =
+                    CompletableFuture.supplyAsync(
+                                    () -> {
+                                        try {
+                                            return this.ingestionQueue.put(
+                                                    this.threadIndex, points);
+                                        } catch (final InterruptedException e) {
+                                            return false;
+                                        }
+                                    })
+                            .get(this.enqueueWaitTime, TimeUnit.SECONDS);
+
+            if (!result) {
+                throw new TimeoutException("Failed to enqueue");
+            }

Review comment:
       This should probably be solved in the `FutureCompletingBlockingQueue` but since we cannot extend it here, I'd leave it as is. But you are effectively using a third thread here.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {

Review comment:
       Please check if there some non-trivial settings that deserve a documentation. For example, what's a typical `url`, `bucket`?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);

Review comment:
       `this.setProperty(INFLUXDB_URL.key(), Preconditions.checkNotNull(influxDBUrl))`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * This class Serialize and deserializes the commit values. Since we are sending the timestamp value
+ * as a committable the Long object is (de)serialized.
+ */
+public final class InfluxDBCommittableSerializer implements SimpleVersionedSerializer<Long> {
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(final Long value) {

Review comment:
       `value` -> `timestamp`

##########
File path: flink-connector-influxdb2/src/main/resources/log4j.properties
##########
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+status=warn
+appender.console.type=Console
+appender.console.name=LogToConsole
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} - %highlight{%5p} %style{%logger{36}}{cyan} - %m%n%throwable
+### Logger Apache Streaming Connectors ###
+logger.streamingConnectors.name=org.apache.flink.streaming.connectors
+logger.streamingConnectors.level=INFO
+logger.streamingConnectors.additivity=false
+logger.streamingConnectors.appenderRef.console.ref=LogToConsole
+# Root Logger
+rootLogger.level=INFO

Review comment:
       should be OFF in the final PR

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");

Review comment:
       `debug`

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/InfluxDBSourceIntegrationTestCase.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.api.client.http.ByteArrayContent;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.util.ExponentialBackOff;
+import java.net.HttpURLConnection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import lombok.SneakyThrows;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSource;
+import org.apache.flink.streaming.connectors.util.InfluxDBTestDeserializer;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/** Integration test for the InfluxDB source for Flink. */
+class InfluxDBSourceIntegrationTestCase extends TestLogger {
+
+    private static final String HTTP_ADDRESS = "http://localhost";
+    private static final int PORT = 8000;
+
+    private static final HttpRequestFactory HTTP_REQUEST_FACTORY =
+            new NetHttpTransport().createRequestFactory();
+    private static final ExponentialBackOff HTTP_BACKOFF =
+            new ExponentialBackOff.Builder()
+                    .setInitialIntervalMillis(250)
+                    .setMaxElapsedTimeMillis(10000)
+                    .setMaxIntervalMillis(1000)
+                    .setMultiplier(1.3)
+                    .setRandomizationFactor(0.5)
+                    .build();
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
+    private final InfluxDBSource<Long> influxDBSource =
+            InfluxDBSource.<Long>builder()
+                    .setPort(PORT)
+                    .setDeserializer(new InfluxDBTestDeserializer())
+                    .build();
+
+    @BeforeAll

Review comment:
       BeforeEach

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/InfluxDBTestDeserializer.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+public class InfluxDBTestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");

Review comment:
       How about `getField` becomes generic?

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/InfluxDBContainer.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+@Slf4j
+public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>

Review comment:
       Could also be a contribution to testcontainers.

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/RetentionUnit.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+public enum RetentionUnit {

Review comment:
       Move to container class? A bit overengineered for the use case ;)

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());

Review comment:
       I don't think that swallowing the exception is useful here. It's resulting in data loss.
   If the user desires such behavior, they should catch+log inside their serializer.
   So please remove try-catch completely.
   It might be best to allow `serializer.serialize` to only throw IOExceptions (same for deserializer).

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import com.influxdb.client.write.Point;
+import java.io.Serializable;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+
+public interface InfluxDBSchemaSerializer<IN> extends Serializable {
+
+    /**
+     * Serializes input into a InfluxDB point.
+     *
+     * @param element to serialize.
+     * @throws Exception if the serialization failed.
+     */
+    Point serialize(final IN element, final Context context) throws Exception;

Review comment:
       only `IOException` would make your life easier.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumState;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumStateSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSplitEnumerator;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBRecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSourceReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSplitReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplitSerializer;
+
+/**
+ * The Source implementation of InfluxDB. Please use a {@link InfluxDBSourceBuilder} to construct a
+ * {@link InfluxDBSource}. The following example shows how to create an InfluxDBSource emitting
+ * records of <code>String</code> type.
+ *
+ * <p>See {@link InfluxDBSourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ */
+public final class InfluxDBSource<OUT>
+        implements Source<OUT, InfluxDBSplit, InfluxDBSourceEnumState>, ResultTypeQueryable<OUT> {
+
+    private final Properties properties;
+    private final InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+
+    InfluxDBSource(
+            final Properties properties,
+            final InfluxDBDataPointDeserializer<OUT> deserializationSchema) {
+        this.properties = properties;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    /**
+     * Get a influxDBSourceBuilder to build a {@link InfluxDBSource}.
+     *
+     * @return a InfluxDB source builder.
+     */
+    public static <OUT> InfluxDBSourceBuilder<OUT> builder() {
+        return new InfluxDBSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, InfluxDBSplit> createReader(
+            final SourceReaderContext sourceReaderContext) {
+        final Supplier<InfluxDBSplitReader> splitReaderSupplier =
+                () -> new InfluxDBSplitReader(this.properties);
+        final InfluxDBRecordEmitter<OUT> recordEmitter =
+                new InfluxDBRecordEmitter<>(this.deserializationSchema);
+
+        return new InfluxDBSourceReader<>(
+                splitReaderSupplier,
+                recordEmitter,
+                this.toConfiguration(this.properties),

Review comment:
       This is a good indication to ditch `Properties` entirely.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+
+public final class InfluxDBRecordEmitter<T> implements RecordEmitter<DataPoint, T, InfluxDBSplit> {
+    private final InfluxDBDataPointDeserializer<T> dataPointDeserializer;
+
+    public InfluxDBRecordEmitter(final InfluxDBDataPointDeserializer<T> dataPointDeserializer) {
+        this.dataPointDeserializer = dataPointDeserializer;
+    }
+
+    @Override
+    public void emitRecord(
+            final DataPoint element, final SourceOutput<T> output, final InfluxDBSplit splitState)
+            throws Exception {
+        output.collect(
+                this.dataPointDeserializer.deserialize(element), (Long) element.getTimestamp());

Review comment:
       Long cast should not be necessary.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Committer;
+
+/**
+ * The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
+ * whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
+ * point in InfluxDB. The checkpoint data point uses the latest written record timestamp.
+ */
+@Slf4j
+public final class InfluxDBCommitter implements Committer<Long> {
+
+    private final InfluxDBClient influxDBClient;
+    private final boolean writeCheckpoint;
+
+    public InfluxDBCommitter(final Properties properties) {
+        this.influxDBClient = getInfluxDBClient(properties);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+    }
+
+    /**
+     * This method is called only when a checkpoint is set and writes a checkpoint data point into
+     * InfluxDB. The {@link
+     * org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter} prepares the
+     * commit and fills the commitable list with the latest timestamp. If the list contains a single
+     * element it will be used as the timestamp of the datapoint. Otherwise when no timestamp is
+     * provided, InfluxDB will use the current timestamp (UTC) of the host machine.
+     *
+     * <p>
+     *
+     * @param committables Contains the latest written timestamp.
+     * @return Empty list
+     * @see <a
+     *     href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#timestamp></a>
+     */
+    @SneakyThrows
+    @Override
+    public List<Long> commit(final List<Long> committables) {
+        if (this.writeCheckpoint) {
+            log.debug("A checkpoint is set.");
+            Optional<Long> lastTimestamp = Optional.empty();
+            if (committables.size() >= 1) {
+                lastTimestamp = Optional.ofNullable(committables.get(committables.size() - 1));
+            }
+            this.writeCheckpointDataPoint(lastTimestamp);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void close() {
+        this.influxDBClient.close();
+        log.debug("Closing the committer.");
+    }
+
+    private void writeCheckpointDataPoint(final Optional<Long> timestamp) {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
+            final Point point = new Point("checkpoint");
+            point.addField("checkpoint", "flink");
+            timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.NS));
+            writeApi.writePoint(point);
+            log.debug("Checkpoint data point write at {}", point.toLineProtocol());
+        }
+    }

Review comment:
       They all seem to be runtime exceptions. I have not seen a single exception where you could easily recover, so let's just tickle them through.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {

Review comment:
       Name -> `addCheckpointDataPoint`?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).

Review comment:
       I'd probably enhance the description to make clear why Telegraf was used. Maybe a even a small diagram
   ```
   sensor -> telegraf -> [ influx CDC source -> ... -> influx sink ] -> influx db
   ```

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");

Review comment:
       `info` is pretty much the default level for most production setup and I'd assume that this information is not that interesting for most users.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r593932186



##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/InfluxDBContainer.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+@Slf4j
+public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>

Review comment:
       It is :wink: 
   https://github.com/testcontainers/testcontainers-java/pull/3669




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r592132222



##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>

Review comment:
       this should have the same scala version from parent.
   

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <flink.new.version>1.12.2</flink.new.version>

Review comment:
       we should update the flink version on parent

##########
File path: pom.xml
##########
@@ -93,7 +94,7 @@
 
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-    <log4j2.version>2.13.3</log4j2.version>
+    <log4j2.version>2.12.1</log4j2.version>

Review comment:
       why you lower the log4j2 version?

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <flink.new.version>1.12.2</flink.new.version>
+    <lombok.version>1.18.10</lombok.version>
+    <scala.binary.new.version>2.11</scala.binary.new.version>
+    <spotless.version>2.7.0</spotless.version>
+    <druid.version>0.13.0-incubating</druid.version>
+    <!--  Test Properties  -->
+    <testcontainers.version>1.15.2</testcontainers.version>
+    <hamcrest.version>2.2</hamcrest.version>
+    <google.http.client.version>1.39.0</google.http.client.version>
+  </properties>
+
+  <dependencies>
+
+    <!-- Flink  -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-base</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.projectlombok</groupId>

Review comment:
       I see that you use this for getter and log4j, as the use of this library could be a barrier for new committers, could you delombok that uses

##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <flink.new.version>1.12.2</flink.new.version>
+    <lombok.version>1.18.10</lombok.version>
+    <scala.binary.new.version>2.11</scala.binary.new.version>
+    <spotless.version>2.7.0</spotless.version>
+    <druid.version>0.13.0-incubating</druid.version>
+    <!--  Test Properties  -->
+    <testcontainers.version>1.15.2</testcontainers.version>
+    <hamcrest.version>2.2</hamcrest.version>
+    <google.http.client.version>1.39.0</google.http.client.version>
+  </properties>
+
+  <dependencies>
+
+    <!-- Flink  -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-base</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+    </dependency>
+
+    <!-- InfluxDB  -->
+
+    <dependency>
+      <groupId>com.influxdb</groupId>
+      <artifactId>influxdb-client-java</artifactId>
+      <version>${influxdbClient.version}</version>
+    </dependency>
+
+    <!-- InfluxDB Line Protocol Parser by Apache Druid -->
+
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-influx-extensions</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+
+    <!-- Logging -->
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+
+    <!-- Flink Test Utils -->
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-test-utils_${scala.binary.new.version}</artifactId>
+      <version>${flink.new.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.new.version}</artifactId>
+      <version>${flink.new.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <!-- Test container -->
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>junit-jupiter</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>influxdb</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google.http.client.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>

Review comment:
       the use of this kind of plugins should be discussed, and used on parent

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);
+    }
+
+    private Object parseBool(final String raw) {
+        final char first = raw.charAt(0);
+        if (first == 't' || first == 'T') {

Review comment:
       this could be reduce to 
   return (first == 't' || first == 'T')

##########
File path: pom.xml
##########
@@ -75,6 +75,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-connector-influxdb2</module>

Review comment:
       why create another module?
   the first one could not be updated?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import static lombok.Lombok.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+public final class InfluxDBSourceBuilder<OUT> {
+
+    private InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+    // Configurations
+    private final Properties properties;
+
+    InfluxDBSourceBuilder() {
+        this.deserializationSchema = null;
+        this.properties = new Properties();
+    }
+
+    /**
+     * Sets the {@link InfluxDBDataPointDeserializer deserializer} of the {@link
+     * org.apache.flink.streaming.connectors.influxdb.common.DataPoint DataPoint} for the
+     * InfluxDBSource.
+     *
+     * @param dataPointDeserializer the deserializer for InfluxDB {@link
+     *     org.apache.flink.streaming.connectors.influxdb.common.DataPoint DataPoint}.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setDeserializer(
+            final InfluxDBDataPointDeserializer<OUT> dataPointDeserializer) {
+        this.deserializationSchema = dataPointDeserializer;
+        return this;
+    }
+
+    /**
+     * Sets the enqueue wait time, i.e., the time out of this InfluxDBSource.
+     *
+     * @param timeOut the enqueue wait time to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setEnqueueWaitTime(final long timeOut) {
+        return this.setProperty(
+                InfluxDBSourceOptions.ENQUEUE_WAIT_TIME.key(), String.valueOf(timeOut));
+    }
+
+    /**
+     * Sets the ingest queue capacity of this InfluxDBSource.
+     *
+     * @param capacity the capacity to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setIngestQueueCapacity(final int capacity) {
+        return this.setProperty(
+                InfluxDBSourceOptions.INGEST_QUEUE_CAPACITY.key(), String.valueOf(capacity));
+    }
+
+    /**
+     * Sets the maximum number of lines that should be parsed per HTTP request for this
+     * InfluxDBSource.
+     *
+     * @param max the maximum number of lines to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setMaximumLinesPerRequest(final int max) {
+        return this.setProperty(
+                InfluxDBSourceOptions.MAXIMUM_LINES_PER_REQUEST.key(), String.valueOf(max));
+    }
+
+    /**
+     * Sets the TCP port on which the split reader's HTTP server of this InfluxDBSource is running
+     * on.
+     *
+     * @param port the port to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setPort(final int port) {
+        return this.setProperty(InfluxDBSourceOptions.PORT.key(), String.valueOf(port));
+    }
+
+    /**
+     * Set arbitrary properties for the InfluxDBSource. The valid keys can be found in {@link
+     * InfluxDBSourceOptions}.
+     *
+     * @param properties the properties to set for the InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setProperties(final Properties properties) {
+        this.properties.putAll(properties);
+        return this;
+    }
+
+    /**
+     * Build the {@link InfluxDBSource}.
+     *
+     * @return a InfluxDBSource with the settings made for this builder.
+     */
+    public InfluxDBSource<OUT> build() {
+        this.sanityCheck();
+        return new InfluxDBSource<>(this.properties, this.deserializationSchema);
+    }
+
+    // ------------- private helpers  --------------
+    /**
+     * Set an arbitrary property for the InfluxDBSource. The valid keys can be found in {@link
+     * InfluxDBSourceOptions}.
+     *
+     * @param key the key of the property.
+     * @param value the value of the property.
+     * @return this InfluxDBSourceBuilder.
+     */
+    private InfluxDBSourceBuilder<OUT> setProperty(final String key, final String value) {
+        this.properties.setProperty(key, value);
+        return this;
+    }
+
+    private void sanityCheck() {
+        checkNotNull(

Review comment:
       why not use Preconditions.checkNotNull from flink ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] bkahloon commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
bkahloon commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-822853231


   Hi @1p4pk , @Shark,  @raminqaf 
   
   I was just wondering if the source connector would be usable via the Flink SQL api. If there isn't a direct integration available with this PR, what work do suspect would be needed. 
   
   Thank you for your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger merged pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger merged pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r595982807



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {
+        return this.setProperty(WRITE_DATA_POINT_CHECKPOINT.key(), String.valueOf(shouldWrite));
+    }
+
+    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {

Review comment:
       This trade-off can be seen in the benchmarks. Further description needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r594208607



##########
File path: flink-connector-influxdb2/pom.xml
##########
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <flink.new.version>1.12.2</flink.new.version>
+    <lombok.version>1.18.10</lombok.version>
+    <scala.binary.new.version>2.11</scala.binary.new.version>
+    <spotless.version>2.7.0</spotless.version>
+    <druid.version>0.13.0-incubating</druid.version>
+    <!--  Test Properties  -->
+    <testcontainers.version>1.15.2</testcontainers.version>
+    <hamcrest.version>2.2</hamcrest.version>
+    <google.http.client.version>1.39.0</google.http.client.version>
+  </properties>
+
+  <dependencies>
+
+    <!-- Flink  -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-base</artifactId>
+      <version>${flink.new.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+    </dependency>
+
+    <!-- InfluxDB  -->
+
+    <dependency>
+      <groupId>com.influxdb</groupId>
+      <artifactId>influxdb-client-java</artifactId>
+      <version>${influxdbClient.version}</version>
+    </dependency>
+
+    <!-- InfluxDB Line Protocol Parser by Apache Druid -->
+
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-influx-extensions</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+
+    <!-- Logging -->
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+
+    <!-- Flink Test Utils -->
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-test-utils_${scala.binary.new.version}</artifactId>
+      <version>${flink.new.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.new.version}</artifactId>
+      <version>${flink.new.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <!-- Test container -->
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>junit-jupiter</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>influxdb</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google.http.client.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>

Review comment:
       We used this plugin since Flink switched to spotless https://github.com/apache/flink/pull/14488 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838304215


   @rmetzger done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-831050905


   For your next contribution to Bahir, please create a Jira ticket: https://issues.apache.org/jira/browse/BAHIR-274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r592165074



##########
File path: pom.xml
##########
@@ -75,6 +75,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-connector-influxdb2</module>

Review comment:
       Unfortunately not. Since there are breaking changes in InfluxDB 2.x. Even the java client is written from scratch and it's not compatible with version 1.7 anymore. More information [here](https://github.com/influxdata/influxdb-client-java#note-use-this-client-library-with-influxdb-2x-and-influxdb-18-see-details-for-connecting-to-influxdb-17-or-earlier-instances-use-the-influxdb-java-client-library). We created a separate module to avoid a situation where other users still rely on InfluxDB 1.x connectors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r592166181



##########
File path: pom.xml
##########
@@ -93,7 +94,7 @@
 
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-    <log4j2.version>2.13.3</log4j2.version>
+    <log4j2.version>2.12.1</log4j2.version>

Review comment:
       We were struggling with this issue in our logs: https://issues.apache.org/jira/browse/LOG4J2-2901
   The logs were not printed on the console and got this error:
   `ERROR StatusLogger Reconfiguration failed: No configuration found for '2aae9190' at 'null' in 'null'`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r593932186



##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/InfluxDBContainer.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+@Slf4j
+public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>

Review comment:
       It is: 
   https://github.com/testcontainers/testcontainers-java/pull/3669




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r597164338



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <h3>Elements of line protocol</h3>
+ *
+ * <pre>
+ *
+ * measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
+ * --------------- --------------- --------------------- -------------------
+ *      |               |                   |                     |
+ * Measurement       Tag set            Field set              Timestamp
+ *
+ * </pre>
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+
+    private final String measurement;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    private final Long timestamp;
+
+    DataPoint(final String measurementName, @Nullable final Long timestamp) {
+        Arguments.checkNotNull(measurementName, "measurement");

Review comment:
       Flink's `Precondition` does not have the `Arguments.checkNonEmpty`. Do you want us to make it fit or just leave it as it is?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r623800214



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+/**
+ * The @builder class for {@link InfluxDBSource} to make it easier for the users to construct a
+ * {@link InfluxDBSource}.
+ *
+ * <p>The following example shows the minimum setup to create a InfluxDBSource that reads the Long
+ * values from a line protocol source.
+ *
+ * <pre>{@code
+ * InfluxDBSource<Long> influxDBSource = InfluxBSource.builder()
+ * .setDeserializer(new InfluxDBDeserializer())
+ * .build()
+ * }</pre>
+ *
+ * <p>To specify the starting port on which the InfluxDBSource starts its HTTP server, one can call
+ * {@link #setPort(int)}.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the settings to build a
+ * InfluxDBSource.
+ */
+public final class InfluxDBSourceBuilder<OUT> {
+
+    private InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+    // Configurations

Review comment:
       Delete it before merging?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r592165074



##########
File path: pom.xml
##########
@@ -75,6 +75,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-connector-influxdb2</module>

Review comment:
       Unfriendly not. Since there are breaking changes in InfluxDB 2.x. Even the java client is written from scratch and it's not compatible with version 1.7 anymore. More information [here](https://github.com/influxdata/influxdb-client-java#note-use-this-client-library-with-influxdb-2x-and-influxdb-18-see-details-for-connecting-to-influxdb-17-or-earlier-instances-use-the-influxdb-java-client-library). We created a separate module to avoid a situation where other users still rely on InfluxDB 1.x connectors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-823026141


   Hi @bkahloon,
   
   my first guess would be no, as this connector implements the [DataStream source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). However, based on the documentation, you need to implement the [Table API & SQL user-defined source and sink API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html) for that.
   
   @AHeise should know better, correct me if I am mistaken.
   
   Best,
   Leon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-831054517


   @1p4pk Could you squash all your commits into one, prefixed with "[BAHIR-274] .." ?
   I tried squashing your change, but it seems that you've merged stuff from master in the your branch in between your work. I tried rebasing to master, but that caused conflicts, which I don't have time to resolve right now.
   
   Just force push the cleaned up branch here, then I'll merge it.
   
   Sorry for the back and forth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838308154


   Thanks a lot. It seems that there are a number of conflicts with `master`. Could you resolve them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r604295073



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import com.influxdb.client.write.Point;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
+@Internal
+public final class InfluxDBPointSerializer implements SimpleVersionedSerializer<Point> {
+
+    private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+    @Override
+    public int getVersion() {
+        return 0;

Review comment:
       should not be 1?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r596043273



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);

Review comment:
       Is static now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r623013910



##########
File path: flink-connector-influxdb2/license-header
##########
@@ -0,0 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review comment:
       what's the benefit of this file

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+/**
+ * The @builder class for {@link InfluxDBSource} to make it easier for the users to construct a
+ * {@link InfluxDBSource}.
+ *
+ * <p>The following example shows the minimum setup to create a InfluxDBSource that reads the Long
+ * values from a line protocol source.
+ *
+ * <pre>{@code
+ * InfluxDBSource<Long> influxDBSource = InfluxBSource.builder()
+ * .setDeserializer(new InfluxDBDeserializer())
+ * .build()
+ * }</pre>
+ *
+ * <p>To specify the starting port on which the InfluxDBSource starts its HTTP server, one can call
+ * {@link #setPort(int)}.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the settings to build a
+ * InfluxDBSource.
+ */
+public final class InfluxDBSourceBuilder<OUT> {
+
+    private InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+    // Configurations

Review comment:
       not: this comment is not really adding value




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-814852508


   Maybe host the PDF somewhere else and link to it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-848099841


   Thanks a lot for adjusting!
   
   Merging PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r596045253



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and feels it
+     * up with the latest timestamp.
+     *
+     * @param flush
+     * @return A list containing 0 or 1 element
+     */
+    @Override
+    public List<Long> prepareCommit(final boolean flush) {
+        if (this.lastTimestamp == 0) {
+            return Collections.emptyList();
+        }
+        final List<Long> lastTimestamp = new ArrayList<>(1);
+        lastTimestamp.add(this.lastTimestamp);
+        return lastTimestamp;
+    }
+
+    @Override
+    public List<Point> snapshotState() {
+        return this.elements;
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.debug("Preparing to write the elements in close.");
+        this.writeCurrentElements();
+        log.debug("Closing the writer.");
+        this.elements.clear();
+    }
+
+    public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
+        this.processingTimerService = processingTimerService;
+    }
+
+    private void writeCurrentElements() throws Exception {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {

Review comment:
       Correct, the only way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf edited a comment on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf edited a comment on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838786735


   @rmetzger Everything should be ready and set for the merge now! :rocket:  only one small thing:
   I noticed that the Travis CI Pipeline is failing (for both environments, scala 2.11 & 2.12) due to three tests in the `flume` connector. Here is a screenshot of the Travis logs:
   ![image](https://user-images.githubusercontent.com/20357405/117848190-561af800-b283-11eb-9bde-e4946202793f.png)
   I found a quick fix for it: I just updated to the newest version of `testcontainers`, and it seems that this fixes the problem. I created a [PR](https://github.com/apache/bahir-flink/pull/123) for this issue. The pipeline passes successfully. I didn't know how to create an issue on Jira but maybe you can help me out on that :) Also, I couldn't add anyone to the reviewers on my PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] raminqaf commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
raminqaf commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r593929428



##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |

Review comment:
       Yeah, that is right. I searched their issue list but couldn't find any related issue to this limitation. I will create one and link it! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-802206987


   Hi @rmetzger,
   
   could you also review this commit?
   
   Thank you.
   
   Best,
   Leon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-829210249


   I'll merge this PR in the next 24 hours (I want to leave some time because I'm not merging things that frequently here in bahir)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] rmetzger commented on a change in pull request #114: InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
rmetzger commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r608589193



##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */

Review comment:
       ```suggestion
    * This class contains code copied from the <a
    *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
    *     Apache Druid InfluxDB Parser </a>, licensed under the Apache License, Version 2.0.
    */
   ```
   
   Could you please replace `master` with the commit you've used when you copied the code?
   
   If possible, it would be good if you could clearly mark which code has been copied. For example you could add a comment like this 
   
   ```
   // ----- everything below this line has been copied from https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java ---
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] 1p4pk commented on pull request #114: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

Posted by GitBox <gi...@apache.org>.
1p4pk commented on pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#issuecomment-838309792


   Yes @raminqaf squashed all commits ever done in this repo. He is trying to fix it...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org