You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2021/03/12 10:25:24 UTC

[GitHub] [bahir-flink] AHeise commented on a change in pull request #114: InfluxDBv2.0 Connector

AHeise commented on a change in pull request #114:
URL: https://github.com/apache/bahir-flink/pull/114#discussion_r592999342



##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).

Review comment:
       Add CDC to description.

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).

Review comment:
       `link` -> `shade`

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |

Review comment:
       👍 

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.

Review comment:
       Not sure if users know what a SplitReader is. How about replacing it with source instance?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)

Review comment:
       👍 

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |
+|    String     | ✅            |
+|    Boolean    | ✅            |
+
+See InfluxDB field set value [data type](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set).
+ 
+
+## Sink
+
+The Sink writes data points to InfluxDB using the [InfluxDB Java Client](https://github.com/influxdata/influxdb-client-java). You provide the connection information (URL, username, password, bucket, and organization) and an implementation of `InfluxDBSchemaSerializer<IN>` generic interface. The implementation of the interface overrides the `serialize(IN element, Context context)` function. This function serializes incoming Flink elements of type `IN` to [Point](https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/com/influxdb/client/write/Point.java) objects.
+
+It is possible to write multiple data points to InfluxDB simultaneously by separating each point with a new line. Batching data points in this manner results in much higher performance. The batch size can be set through the `WRITE_BUFFER_SIZE` option. By default, the buffer size is set to 1000 and can be changed to any value using the `setWriteBufferSize(final int bufferSize)` of the Sink builder class.
+
+It is possible to write checkpoint data points to InfluxDB whenever Flink sets a checkpoint. To enable this functionality, you need to set the `WRITE_DATA_POINT_CHECKPOINT` flag to true (default is false). The checkpoint data point looks as follow:
+```text
+checkpoint checkpoint=flink <timestamp>
+```
+The timestamp refers to the latest element that Flink serializes.
+
+### Usage
+
+```java=
+// The InfluxDB Sink uses the build pattern to create a Sink object
+InfluxDBSink<Long> influxDBSink = InfluxDBSink.<Long>builder()

Review comment:
       Similar idea: leave `builder()` untyped?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()

Review comment:
       Maybe builder should be untyped (`?`) until the user sets the deserializer?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |
+|    String     | ✅            |
+|    Boolean    | ✅            |
+
+See InfluxDB field set value [data type](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set).
+ 
+
+## Sink
+
+The Sink writes data points to InfluxDB using the [InfluxDB Java Client](https://github.com/influxdata/influxdb-client-java). You provide the connection information (URL, username, password, bucket, and organization) and an implementation of `InfluxDBSchemaSerializer<IN>` generic interface. The implementation of the interface overrides the `serialize(IN element, Context context)` function. This function serializes incoming Flink elements of type `IN` to [Point](https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/com/influxdb/client/write/Point.java) objects.
+
+It is possible to write multiple data points to InfluxDB simultaneously by separating each point with a new line. Batching data points in this manner results in much higher performance. The batch size can be set through the `WRITE_BUFFER_SIZE` option. By default, the buffer size is set to 1000 and can be changed to any value using the `setWriteBufferSize(final int bufferSize)` of the Sink builder class.
+
+It is possible to write checkpoint data points to InfluxDB whenever Flink sets a checkpoint. To enable this functionality, you need to set the `WRITE_DATA_POINT_CHECKPOINT` flag to true (default is false). The checkpoint data point looks as follow:
+```text
+checkpoint checkpoint=flink <timestamp>
+```
+The timestamp refers to the latest element that Flink serializes.
+
+### Usage
+
+```java=
+// The InfluxDB Sink uses the build pattern to create a Sink object
+InfluxDBSink<Long> influxDBSink = InfluxDBSink.<Long>builder()
+        .setInfluxDBSchemaSerializer(new TestSerializer())
+        .setInfluxDBUrl(getUrl())           // http://localhost:8086
+        .setInfluxDBUsername(getUsername()) // admin
+        .setInfluxDBPassword(getPassword()) // admin
+        .setInfluxDBBucket(getBucket())     // default
+        .setInfluxDBOrganization(getOrg())  // influxdata
+        .build();
+        
+// ...
+
+/**
+ * Implementation of InfluxDBSchemaSerializer interface
+ * (element) -----> (dataPoint)
+ *  1L -----------> test,longValue=1 fieldKey="fieldValue"
+ *  2L -----------> test,longValue=2 fieldKey="fieldValue"
+ *  3L -----------> test,longValue=3 fieldKey="fieldValue"           
+ */
+class TestSerializer implements InfluxDBSchemaSerializer<Long> {
+
+    @Override
+    public Point serialize(Long element, Context context) {
+        final Point dataPoint = new Point("test");
+        dataPoint.addTag("longValue", String.valueOf(element));
+        dataPoint.addField("fieldKey", "fieldValue");
+        return dataPoint;
+    }
+}
+```
+
+### Options
+
+| Option            | Description   | Default Value   |
+| ----------------- |-----------------|:-----------------:|
+| WRITE_DATA_POINT_CHECKPOINT | Determines if the checkpoint data point should be written to InfluxDB or not. | false |
+| WRITE_BUFFER_SIZE | Number of elements to buffer the data before writing them to InfluxDB. | 1000 |
+| INFLUXDB_URL | InfluxDB Connection URL. | ❌ |
+| INFLUXDB_USERNAME | InfluxDB username. | ❌ |
+| INFLUXDB_PASSWORD | InfluxDB password. | ❌ |
+| INFLUXDB_BUCKET | InfluxDB bucket. | ❌ |
+| INFLUXDB_ORGANIZATION | InfluxDB organization. | ❌ |

Review comment:
       :+1:

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {

Review comment:
       `Number`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import lombok.Getter;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+    @Getter private final String name;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    @Getter private final Number timestamp;

Review comment:
       `Long`?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);
+    }
+
+    private Object parseBool(final String raw) {

Review comment:
       Should probably by `Boolean`.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {

Review comment:
       param name c&p issue.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {

Review comment:
       This deserves a proper documentation and should probably also be an additional section in the readme.md.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {
+        return this.setProperty(WRITE_DATA_POINT_CHECKPOINT.key(), String.valueOf(shouldWrite));
+    }
+
+    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {
+        return this.setProperty(WRITE_BUFFER_SIZE.key(), String.valueOf(bufferSize));
+    }
+
+    public InfluxDBSink<IN> build() {
+        this.sanityCheck();
+        return new InfluxDBSink<>(this.influxDBSchemaSerializer, this.properties);
+    }
+
+    // ------------- private helpers  --------------
+    /**
+     * Set an arbitrary property for the InfluxDBSink. The valid keys can be found in {@link
+     * InfluxDBSinkOptions}.
+     *
+     * @param key the key of the property.
+     * @param value the value of the property.
+     * @return this InfluxDBSinkBuilder.
+     */
+    private InfluxDBSinkBuilder<IN> setProperty(final String key, final String value) {
+        this.properties.setProperty(key, value);
+        return this;
+    }
+
+    /** Checks if the SchemaSerializer and the influxDBConfig are not null and set. */
+    private void sanityCheck() {
+        // Check required settings.
+        checkNotNull(

Review comment:
       + other mandatory fields.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxDBOptionsBase.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.util.Properties;
+import java.util.function.Function;
+import org.apache.flink.configuration.ConfigOption;
+
+public abstract class InfluxDBOptionsBase {

Review comment:
       Base class seems to be unnecessary. How about replacing `Properties` with `org.apache.flink.configuration.Configuration` that would give you the type-safety that you desire?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {
+        return this.setProperty(WRITE_DATA_POINT_CHECKPOINT.key(), String.valueOf(shouldWrite));
+    }
+
+    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {

Review comment:
       check bufferSize > 0. describe trade-off latency <-> throughput?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * This class Serialize and deserializes the commit values. Since we are sending the timestamp value
+ * as a committable the Long object is (de)serialized.
+ */
+public final class InfluxDBCommittableSerializer implements SimpleVersionedSerializer<Long> {
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(final Long value) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(0, value);
+        return buffer.array();
+    }
+
+    @Override
+    public Long deserialize(final int version, final byte[] serialized) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);

Review comment:
       `ByteBuffer.wrap`?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Committer;
+
+/**
+ * The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
+ * whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
+ * point in InfluxDB. The checkpoint data point uses the latest written record timestamp.
+ */
+@Slf4j
+public final class InfluxDBCommitter implements Committer<Long> {
+
+    private final InfluxDBClient influxDBClient;
+    private final boolean writeCheckpoint;
+
+    public InfluxDBCommitter(final Properties properties) {
+        this.influxDBClient = getInfluxDBClient(properties);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+    }
+
+    /**
+     * This method is called only when a checkpoint is set and writes a checkpoint data point into
+     * InfluxDB. The {@link
+     * org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter} prepares the
+     * commit and fills the commitable list with the latest timestamp. If the list contains a single
+     * element it will be used as the timestamp of the datapoint. Otherwise when no timestamp is
+     * provided, InfluxDB will use the current timestamp (UTC) of the host machine.
+     *
+     * <p>
+     *
+     * @param committables Contains the latest written timestamp.
+     * @return Empty list
+     * @see <a
+     *     href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#timestamp></a>
+     */
+    @SneakyThrows

Review comment:
       translate any checked exception (are there any?) into `IOException` and use explicit signature.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and feels it
+     * up with the latest timestamp.
+     *
+     * @param flush
+     * @return A list containing 0 or 1 element
+     */
+    @Override
+    public List<Long> prepareCommit(final boolean flush) {
+        if (this.lastTimestamp == 0) {
+            return Collections.emptyList();
+        }
+        final List<Long> lastTimestamp = new ArrayList<>(1);
+        lastTimestamp.add(this.lastTimestamp);
+        return lastTimestamp;
+    }
+
+    @Override
+    public List<Point> snapshotState() {
+        return this.elements;
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.debug("Preparing to write the elements in close.");
+        this.writeCurrentElements();
+        log.debug("Closing the writer.");
+        this.elements.clear();
+    }
+
+    public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
+        this.processingTimerService = processingTimerService;
+    }
+
+    private void writeCurrentElements() throws Exception {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {

Review comment:
       Did you verify that this is cheap enough to do on each batch? I guess it has to be as this is the only way to do batch writes, right?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());

Review comment:
       Remove or `trace`.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and feels it

Review comment:
       fills?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and feels it
+     * up with the latest timestamp.
+     *
+     * @param flush
+     * @return A list containing 0 or 1 element
+     */
+    @Override
+    public List<Long> prepareCommit(final boolean flush) {
+        if (this.lastTimestamp == 0) {
+            return Collections.emptyList();
+        }
+        final List<Long> lastTimestamp = new ArrayList<>(1);
+        lastTimestamp.add(this.lastTimestamp);
+        return lastTimestamp;
+    }
+
+    @Override
+    public List<Point> snapshotState() {
+        return this.elements;
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.debug("Preparing to write the elements in close.");
+        this.writeCurrentElements();
+        log.debug("Closing the writer.");

Review comment:
       Where? ;)

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumState;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumStateSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSplitEnumerator;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBRecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSourceReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSplitReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplitSerializer;
+
+/**
+ * The Source implementation of InfluxDB. Please use a {@link InfluxDBSourceBuilder} to construct a
+ * {@link InfluxDBSource}. The following example shows how to create an InfluxDBSource emitting
+ * records of <code>String</code> type.
+ *
+ * <p>See {@link InfluxDBSourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ */
+public final class InfluxDBSource<OUT>
+        implements Source<OUT, InfluxDBSplit, InfluxDBSourceEnumState>, ResultTypeQueryable<OUT> {
+
+    private final Properties properties;
+    private final InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+
+    InfluxDBSource(
+            final Properties properties,
+            final InfluxDBDataPointDeserializer<OUT> deserializationSchema) {
+        this.properties = properties;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    /**
+     * Get a influxDBSourceBuilder to build a {@link InfluxDBSource}.
+     *
+     * @return a InfluxDB source builder.
+     */
+    public static <OUT> InfluxDBSourceBuilder<OUT> builder() {
+        return new InfluxDBSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, InfluxDBSplit> createReader(
+            final SourceReaderContext sourceReaderContext) {
+        final Supplier<InfluxDBSplitReader> splitReaderSupplier =
+                () -> new InfluxDBSplitReader(this.properties);
+        final InfluxDBRecordEmitter<OUT> recordEmitter =
+                new InfluxDBRecordEmitter<>(this.deserializationSchema);
+
+        return new InfluxDBSourceReader<>(
+                splitReaderSupplier,
+                recordEmitter,
+                this.toConfiguration(this.properties),
+                sourceReaderContext);
+    }
+
+    @Override
+    public SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> createEnumerator(
+            final SplitEnumeratorContext<InfluxDBSplit> splitEnumeratorContext) {
+        return new InfluxDBSplitEnumerator(splitEnumeratorContext);
+    }
+
+    @Override
+    public SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> restoreEnumerator(
+            final SplitEnumeratorContext<InfluxDBSplit> splitEnumeratorContext,
+            final InfluxDBSourceEnumState influxDBSourceEnumState) {
+        return null;

Review comment:
       This needs to be implemented. Since you don't have any state, you could just use `new InfluxDBSplitEnumerator(splitEnumeratorContext)`. 

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceOptions.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import java.util.Properties;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxDBOptionsBase;
+
+/* Configurations for a InfluxDBSource. */
+public final class InfluxDBSourceOptions extends InfluxDBOptionsBase {
+
+    private InfluxDBSourceOptions() {}
+
+    public static final ConfigOption<Long> ENQUEUE_WAIT_TIME =
+            ConfigOptions.key("source.influxDB.timeout.enqueue")
+                    .longType()
+                    .defaultValue(5L)
+                    .withDescription(
+                            "The time out in seconds for enqueuing an HTTP request to the queue.");
+
+    public static final ConfigOption<Integer> INGEST_QUEUE_CAPACITY =
+            ConfigOptions.key("source.influxDB.queue_capacity.ingest")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Size of queue that buffers HTTP requests data points before fetching.");
+
+    public static final ConfigOption<Integer> MAXIMUM_LINES_PER_REQUEST =
+            ConfigOptions.key("source.influxDB.limit.lines_per_request")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The maximum number of lines that should be parsed per HTTP request.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("source.influxDB.port")
+                    .intType()
+                    .defaultValue(8000)
+                    .withDescription(
+                            "TCP port on which the split reader's HTTP server is running on.");
+
+    public static long getEnqueueWaitTime(final Properties properties) {
+        return getOption(properties, ENQUEUE_WAIT_TIME, Long::parseLong);
+    }
+
+    public static int getIngestQueueCapacity(final Properties properties) {
+        return getOption(properties, INGEST_QUEUE_CAPACITY, Integer::parseInt);
+    }
+
+    public static int getMaximumLinesPerRequest(final Properties properties) {
+        return getOption(properties, MAXIMUM_LINES_PER_REQUEST, Integer::parseInt);
+    }
+
+    public static int getPort(final Properties properties) {
+        return getOption(properties, PORT, Integer::parseInt);
+    }

Review comment:
       Could be removed if you switch to Configuration.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumStateSerializer.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/** InfluxDB is stateless due to its unreplayable HTTP request source. */

Review comment:
       This needs to be documented in readme md + source as well.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.jetbrains.annotations.Nullable;
+
+/** The enumerator class for InfluxDB source. */
+@Internal
+public final class InfluxDBSplitEnumerator
+        implements SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> {
+
+    private final SplitEnumeratorContext<InfluxDBSplit> context;
+
+    public InfluxDBSplitEnumerator(final SplitEnumeratorContext<InfluxDBSplit> context) {
+        this.context = checkNotNull(context);
+    }
+
+    @Override
+    public void start() {
+        // no resources to start
+    }
+
+    @Override
+    public void handleSplitRequest(final int subtaskId, @Nullable final String requesterHostname) {
+        this.context.assignSplit(new InfluxDBSplit(subtaskId), subtaskId);
+    }
+
+    @Override
+    public void addSplitsBack(final List<InfluxDBSplit> splits, final int subtaskId) {}
+
+    @Override
+    public void addReader(final int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon registration
+    }
+
+    @Override
+    public InfluxDBSourceEnumState snapshotState() throws Exception {
+        return null;

Review comment:
       return the empty state

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.jetbrains.annotations.NotNull;
+
+abstract class Handler implements HttpHandler {

Review comment:
       Quick doc

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);
+    }
+
+    private Object parseBool(final String raw) {
+        final char first = raw.charAt(0);
+        if (first == 't' || first == 'T') {
+            return "true";
+        } else {
+            return "false";
+        }
+    }
+
+    private String parseIdentifier(final InfluxLineProtocolParser.IdentifierContext ctx) {
+        if (ctx.BOOLEAN() != null || ctx.NUMBER() != null) {
+            return ctx.getText();
+        }
+
+        return IDENTIFIER_PATTERN.matcher(ctx.IDENTIFIER_STRING().getText()).replaceAll("$1");
+    }
+
+    private Number parseTimestamp(@Nullable final TimestampContext timestamp) {

Review comment:
       Long?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml=
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server starts per SplitReader that parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and send to the next Flink operator.
+
+When using Telegraf, you have two choices to configure it. You can either configure its [InfluxDB v2 output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#influxdb_v2) for writing to the running HTTP servers or use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http) for that:
+
+```toml
+[[outputs.influxdb_v2]]
+  urls = ["http://task-manager-1:8000", "http:/task-manager-2:8000"]
+
+# or
+
+[[outputs.http]]
+  url = "http://task-manager-1:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java=
+InfluxDBSource<Long> influxDBSource = InfluxBSource.<Long>builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+        
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the split reader's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       | 
+| ------------- |:-------------:| 
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |

Review comment:
       UInt limitation comes directly from Druid right? Could you check if there is a ticket and link it?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);

Review comment:
       This seems to be rather costly for each line. Could you please double-check if there is a way to reuse the parser with existing Druid parser?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.IOException;
+
+public final class HealthCheckHandler extends Handler {

Review comment:
       Quick doc

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
+@Slf4j
+public final class WriteAPIHandler extends Handler {
+    private final InfluxParser parser = new InfluxParser();
+    private final int maximumLinesPerRequest;
+    private final FutureCompletingBlockingQueue ingestionQueue;
+    private final int threadIndex;
+    private final long enqueueWaitTime;
+
+    public WriteAPIHandler(
+            final int maximumLinesPerRequest,
+            final FutureCompletingBlockingQueue ingestionQueue,
+            final int threadIndex,
+            final long enqueueWaitTime) {
+        this.maximumLinesPerRequest = maximumLinesPerRequest;
+        this.ingestionQueue = ingestionQueue;
+        this.threadIndex = threadIndex;
+        this.enqueueWaitTime = enqueueWaitTime;
+    }
+
+    @Override
+    public void handle(final HttpExchange t) throws IOException {
+        final BufferedReader in =
+                new BufferedReader(
+                        new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));
+
+        try {
+            String line;
+            final List<DataPoint> points = new ArrayList<>();
+            int numberOfLinesParsed = 0;
+            while ((line = in.readLine()) != null) {
+                final DataPoint dataPoint = this.parser.parseToDataPoint(line);
+                points.add(dataPoint);
+                numberOfLinesParsed++;
+                if (numberOfLinesParsed > this.maximumLinesPerRequest) {
+                    throw new RequestTooLargeException(
+                            String.format(
+                                    "Payload too large. Maximum number of lines per request is %d.",
+                                    this.maximumLinesPerRequest));
+                }
+            }

Review comment:
       This part is quite confusing to me and probably inefficient:
   - You are splitting the request into lines.
   - Then you are creating a parser for each line and parse the line.
   - You are creating quite a bit of intermediate objects.
   
   Wouldn't it be possible to just parse the whole input stream through antlr? You'd create the parser once per batch and have no intermediate representations.
   
   You can probably leverage the `Stream` API to parse the lines as you go to not go over your queue limit.
   

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {

Review comment:
       `String`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import lombok.Getter;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.

Review comment:
       I'd add an example/pattern of a datapoint here.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import com.influxdb.client.write.Point;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.Getter;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.commiter.InfluxDBCommittableSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.commiter.InfluxDBCommitter;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBPointSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+
+public final class InfluxDBSink<IN> implements Sink<IN, Long, Point, Void> {

Review comment:
       Add docs + usage example (recycle from readme).

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.split;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * The {@link org.apache.flink.core.io.SimpleVersionedSerializer serializer} for {@link
+ * InfluxDBSplit}.
+ */
+public final class InfluxDBSplitSerializer implements SimpleVersionedSerializer<InfluxDBSplit> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(final InfluxDBSplit influxDBSplit) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(0, influxDBSplit.getId());
+        return buffer.array();
+    }
+
+    @Override
+    public InfluxDBSplit deserialize(final int version, final byte[] serialized) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);

Review comment:
       ByteBuffer.wrap

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * @see <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>
+ */
+public class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    @Nullable
+    public DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = this.parseIdentifier(line.identifier());
+
+        final Number timestamp = this.parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> this.parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> this.parseField(t, out));
+
+        return out;
+    }
+
+    private void parseTag(final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = this.parseIdentifier(tag.identifier(0));
+        final String value = this.parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = this.parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = this.parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = this.parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = this.parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private Object parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private Object parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return new Double(raw);

Review comment:
       `Double.valueOf`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer;
+
+import java.io.Serializable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+
+/** An interface for the deserialization of InfluxDB data points. */
+public interface InfluxDBDataPointDeserializer<OUT> extends Serializable, ResultTypeQueryable<OUT> {
+
+    /**
+     * Deserialize a data point into the given collector.
+     *
+     * @param dataPoint the {@code DataPoint} to deserialize.
+     * @throws Exception if the deserialization failed.
+     */
+    OUT deserialize(DataPoint dataPoint) throws Exception;

Review comment:
       IOException?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
+
+import com.sun.net.httpserver.HttpServer;

Review comment:
       Please check if there is an option that is not coming from `com.sun.net` or verify that this is portable.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
+@Slf4j
+public final class WriteAPIHandler extends Handler {
+    private final InfluxParser parser = new InfluxParser();
+    private final int maximumLinesPerRequest;
+    private final FutureCompletingBlockingQueue ingestionQueue;
+    private final int threadIndex;
+    private final long enqueueWaitTime;
+
+    public WriteAPIHandler(
+            final int maximumLinesPerRequest,
+            final FutureCompletingBlockingQueue ingestionQueue,
+            final int threadIndex,
+            final long enqueueWaitTime) {
+        this.maximumLinesPerRequest = maximumLinesPerRequest;
+        this.ingestionQueue = ingestionQueue;
+        this.threadIndex = threadIndex;
+        this.enqueueWaitTime = enqueueWaitTime;
+    }
+
+    @Override
+    public void handle(final HttpExchange t) throws IOException {
+        final BufferedReader in =
+                new BufferedReader(
+                        new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));
+
+        try {
+            String line;
+            final List<DataPoint> points = new ArrayList<>();
+            int numberOfLinesParsed = 0;
+            while ((line = in.readLine()) != null) {
+                final DataPoint dataPoint = this.parser.parseToDataPoint(line);
+                points.add(dataPoint);
+                numberOfLinesParsed++;
+                if (numberOfLinesParsed > this.maximumLinesPerRequest) {
+                    throw new RequestTooLargeException(
+                            String.format(
+                                    "Payload too large. Maximum number of lines per request is %d.",
+                                    this.maximumLinesPerRequest));
+                }
+            }
+
+            final boolean result =
+                    CompletableFuture.supplyAsync(
+                                    () -> {
+                                        try {
+                                            return this.ingestionQueue.put(
+                                                    this.threadIndex, points);
+                                        } catch (final InterruptedException e) {
+                                            return false;
+                                        }
+                                    })
+                            .get(this.enqueueWaitTime, TimeUnit.SECONDS);
+
+            if (!result) {
+                throw new TimeoutException("Failed to enqueue");
+            }

Review comment:
       This should probably be solved in the `FutureCompletingBlockingQueue` but since we cannot extend it here, I'd leave it as is. But you are effectively using a third thread here.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {

Review comment:
       Please check if there some non-trivial settings that deserve a documentation. For example, what's a typical `url`, `bucket`?

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);

Review comment:
       `this.setProperty(INFLUXDB_URL.key(), Preconditions.checkNotNull(influxDBUrl))`

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * This class Serialize and deserializes the commit values. Since we are sending the timestamp value
+ * as a committable the Long object is (de)serialized.
+ */
+public final class InfluxDBCommittableSerializer implements SimpleVersionedSerializer<Long> {
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(final Long value) {

Review comment:
       `value` -> `timestamp`

##########
File path: flink-connector-influxdb2/src/main/resources/log4j.properties
##########
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+status=warn
+appender.console.type=Console
+appender.console.name=LogToConsole
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} - %highlight{%5p} %style{%logger{36}}{cyan} - %m%n%throwable
+### Logger Apache Streaming Connectors ###
+logger.streamingConnectors.name=org.apache.flink.streaming.connectors
+logger.streamingConnectors.level=INFO
+logger.streamingConnectors.additivity=false
+logger.streamingConnectors.appenderRef.console.ref=LogToConsole
+# Root Logger
+rootLogger.level=INFO

Review comment:
       should be OFF in the final PR

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");

Review comment:
       `debug`

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/InfluxDBSourceIntegrationTestCase.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.api.client.http.ByteArrayContent;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.util.ExponentialBackOff;
+import java.net.HttpURLConnection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import lombok.SneakyThrows;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSource;
+import org.apache.flink.streaming.connectors.util.InfluxDBTestDeserializer;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/** Integration test for the InfluxDB source for Flink. */
+class InfluxDBSourceIntegrationTestCase extends TestLogger {
+
+    private static final String HTTP_ADDRESS = "http://localhost";
+    private static final int PORT = 8000;
+
+    private static final HttpRequestFactory HTTP_REQUEST_FACTORY =
+            new NetHttpTransport().createRequestFactory();
+    private static final ExponentialBackOff HTTP_BACKOFF =
+            new ExponentialBackOff.Builder()
+                    .setInitialIntervalMillis(250)
+                    .setMaxElapsedTimeMillis(10000)
+                    .setMaxIntervalMillis(1000)
+                    .setMultiplier(1.3)
+                    .setRandomizationFactor(0.5)
+                    .build();
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
+    private final InfluxDBSource<Long> influxDBSource =
+            InfluxDBSource.<Long>builder()
+                    .setPort(PORT)
+                    .setDeserializer(new InfluxDBTestDeserializer())
+                    .build();
+
+    @BeforeAll

Review comment:
       BeforeEach

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/InfluxDBTestDeserializer.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+public class InfluxDBTestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");

Review comment:
       How about `getField` becomes generic?

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/InfluxDBContainer.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+@Slf4j
+public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>

Review comment:
       Could also be a contribution to testcontainers.

##########
File path: flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/util/RetentionUnit.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.util;
+
+public enum RetentionUnit {

Review comment:
       Move to container class? A bit overengineered for the use case ;)

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");
+                this.writeCurrentElements();
+                this.elements.clear();
+            } else {
+                log.debug("Adding elements to buffer. Buffer size: {}", this.elements.size());
+                this.elements.add(this.schemaSerializer.serialize(in, context));
+                if (context.timestamp() != null) {
+                    this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+                }
+            }
+        } catch (final Exception e) {
+            log.error(e.getMessage());

Review comment:
       I don't think that swallowing the exception is useful here. It's resulting in data loss.
   If the user desires such behavior, they should catch+log inside their serializer.
   So please remove try-catch completely.
   It might be best to allow `serializer.serialize` to only throw IOExceptions (same for deserializer).

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import com.influxdb.client.write.Point;
+import java.io.Serializable;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+
+public interface InfluxDBSchemaSerializer<IN> extends Serializable {
+
+    /**
+     * Serializes input into a InfluxDB point.
+     *
+     * @param element to serialize.
+     * @throws Exception if the serialization failed.
+     */
+    Point serialize(final IN element, final Context context) throws Exception;

Review comment:
       only `IOException` would make your life easier.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumState;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumStateSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSplitEnumerator;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBRecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSourceReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSplitReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplitSerializer;
+
+/**
+ * The Source implementation of InfluxDB. Please use a {@link InfluxDBSourceBuilder} to construct a
+ * {@link InfluxDBSource}. The following example shows how to create an InfluxDBSource emitting
+ * records of <code>String</code> type.
+ *
+ * <p>See {@link InfluxDBSourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ */
+public final class InfluxDBSource<OUT>
+        implements Source<OUT, InfluxDBSplit, InfluxDBSourceEnumState>, ResultTypeQueryable<OUT> {
+
+    private final Properties properties;
+    private final InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+
+    InfluxDBSource(
+            final Properties properties,
+            final InfluxDBDataPointDeserializer<OUT> deserializationSchema) {
+        this.properties = properties;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    /**
+     * Get a influxDBSourceBuilder to build a {@link InfluxDBSource}.
+     *
+     * @return a InfluxDB source builder.
+     */
+    public static <OUT> InfluxDBSourceBuilder<OUT> builder() {
+        return new InfluxDBSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, InfluxDBSplit> createReader(
+            final SourceReaderContext sourceReaderContext) {
+        final Supplier<InfluxDBSplitReader> splitReaderSupplier =
+                () -> new InfluxDBSplitReader(this.properties);
+        final InfluxDBRecordEmitter<OUT> recordEmitter =
+                new InfluxDBRecordEmitter<>(this.deserializationSchema);
+
+        return new InfluxDBSourceReader<>(
+                splitReaderSupplier,
+                recordEmitter,
+                this.toConfiguration(this.properties),

Review comment:
       This is a good indication to ditch `Properties` entirely.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+
+public final class InfluxDBRecordEmitter<T> implements RecordEmitter<DataPoint, T, InfluxDBSplit> {
+    private final InfluxDBDataPointDeserializer<T> dataPointDeserializer;
+
+    public InfluxDBRecordEmitter(final InfluxDBDataPointDeserializer<T> dataPointDeserializer) {
+        this.dataPointDeserializer = dataPointDeserializer;
+    }
+
+    @Override
+    public void emitRecord(
+            final DataPoint element, final SourceOutput<T> output, final InfluxDBSplit splitState)
+            throws Exception {
+        output.collect(
+                this.dataPointDeserializer.deserialize(element), (Long) element.getTimestamp());

Review comment:
       Long cast should not be necessary.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Committer;
+
+/**
+ * The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
+ * whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
+ * point in InfluxDB. The checkpoint data point uses the latest written record timestamp.
+ */
+@Slf4j
+public final class InfluxDBCommitter implements Committer<Long> {
+
+    private final InfluxDBClient influxDBClient;
+    private final boolean writeCheckpoint;
+
+    public InfluxDBCommitter(final Properties properties) {
+        this.influxDBClient = getInfluxDBClient(properties);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+    }
+
+    /**
+     * This method is called only when a checkpoint is set and writes a checkpoint data point into
+     * InfluxDB. The {@link
+     * org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter} prepares the
+     * commit and fills the commitable list with the latest timestamp. If the list contains a single
+     * element it will be used as the timestamp of the datapoint. Otherwise when no timestamp is
+     * provided, InfluxDB will use the current timestamp (UTC) of the host machine.
+     *
+     * <p>
+     *
+     * @param committables Contains the latest written timestamp.
+     * @return Empty list
+     * @see <a
+     *     href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#timestamp></a>
+     */
+    @SneakyThrows
+    @Override
+    public List<Long> commit(final List<Long> committables) {
+        if (this.writeCheckpoint) {
+            log.debug("A checkpoint is set.");
+            Optional<Long> lastTimestamp = Optional.empty();
+            if (committables.size() >= 1) {
+                lastTimestamp = Optional.ofNullable(committables.get(committables.size() - 1));
+            }
+            this.writeCheckpointDataPoint(lastTimestamp);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void close() {
+        this.influxDBClient.close();
+        log.debug("Closing the committer.");
+    }
+
+    private void writeCheckpointDataPoint(final Optional<Long> timestamp) {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
+            final Point point = new Point("checkpoint");
+            point.addField("checkpoint", "flink");
+            timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.NS));
+            writeApi.writePoint(point);
+            log.debug("Checkpoint data point write at {}", point.toLineProtocol());
+        }
+    }

Review comment:
       They all seem to be runtime exceptions. I have not seen a single exception where you could easily recover, so let's just tickle them through.

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.Properties;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Properties properties;
+
+    public InfluxDBSinkBuilder() {
+        this.influxDBSchemaSerializer = null;
+        this.properties = new Properties();
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_URL.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_USERNAME.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_PASSWORD.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_BUCKET.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String influxDBUrl) {
+        return this.setProperty(INFLUXDB_ORGANIZATION.key(), influxDBUrl);
+    }
+
+    public InfluxDBSinkBuilder<IN> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return this;
+    }
+
+    public InfluxDBSinkBuilder<IN> setDataPointCheckpoint(final boolean shouldWrite) {

Review comment:
       Name -> `addCheckpointDataPoint`?

##########
File path: flink-connector-influxdb2/README.md
##########
@@ -0,0 +1,202 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:UnifiedSinkAPI-SinkAPI).

Review comment:
       I'd probably enhance the description to make clear why Telegraf was used. Maybe a even a small diagram
   ```
   sensor -> telegraf -> [ influx CDC source -> ... -> influx sink ] -> influx db
   ```

##########
File path: flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getBufferSizeCapacity;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.writeDataPointCheckpoint;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Slf4j
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer, final Properties properties) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = getBufferSizeCapacity(properties);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = writeDataPointCheckpoint(properties);
+        this.influxDBClient = getInfluxDBClient(properties);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) {
+        try {
+            if (this.elements.size() == this.bufferSize) {
+                log.info("Buffer size reached preparing to write the elements.");

Review comment:
       `info` is pretty much the default level for most production setup and I'd assume that this information is not that interesting for most users.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org