You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2021/05/25 18:37:14 UTC

[bahir-flink] branch master updated: [BAHIR-274] Add Flink InfluxDBv2.0 Connector

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 809a588  [BAHIR-274] Add Flink InfluxDBv2.0 Connector
809a588 is described below

commit 809a588dfabbc36a8db1abbc0ac9d7c81129c7e5
Author: 1p4pk <le...@gmx.de>
AuthorDate: Sun Dec 6 15:56:53 2020 +0100

    [BAHIR-274] Add Flink InfluxDBv2.0 Connector
    
    Co-authored-by: Leon Papke <le...@gmx.de>
    Co-authored-by: Felix Seidel <fe...@seidel.me>
---
 flink-connector-influxdb2/README.md                | 207 +++++++++++++++++++
 .../media/connector-architecture.png               | Bin 0 -> 30409 bytes
 .../media/source-architecture.png                  | Bin 0 -> 47615 bytes
 flink-connector-influxdb2/pom.xml                  | 142 +++++++++++++
 .../connectors/influxdb/common/DataPoint.java      | 159 +++++++++++++++
 .../connectors/influxdb/common/InfluxParser.java   | 140 +++++++++++++
 .../connectors/influxdb/sink/InfluxDBSink.java     | 113 +++++++++++
 .../influxdb/sink/InfluxDBSinkBuilder.java         | 201 +++++++++++++++++++
 .../influxdb/sink/InfluxDBSinkOptions.java         |  90 +++++++++
 .../commiter/InfluxDBCommittableSerializer.java    |  48 +++++
 .../influxdb/sink/commiter/InfluxDBCommitter.java  |  97 +++++++++
 .../sink/writer/InfluxDBPointSerializer.java       |  64 ++++++
 .../sink/writer/InfluxDBSchemaSerializer.java      |  34 ++++
 .../influxdb/sink/writer/InfluxDBWriter.java       | 136 +++++++++++++
 .../connectors/influxdb/source/InfluxDBSource.java | 122 ++++++++++++
 .../influxdb/source/InfluxDBSourceBuilder.java     | 133 +++++++++++++
 .../influxdb/source/InfluxDBSourceOptions.java     |  55 ++++++
 .../source/enumerator/InfluxDBSourceEnumState.java |  24 +++
 .../InfluxDBSourceEnumStateSerializer.java         |  41 ++++
 .../source/enumerator/InfluxDBSplitEnumerator.java |  65 ++++++
 .../connectors/influxdb/source/http/Handler.java   |  42 ++++
 .../influxdb/source/http/HealthCheckHandler.java   |  36 ++++
 .../influxdb/source/http/WriteAPIHandler.java      | 123 ++++++++++++
 .../source/reader/InfluxDBRecordEmitter.java       |  42 ++++
 .../source/reader/InfluxDBSourceReader.java        |  66 +++++++
 .../source/reader/InfluxDBSplitReader.java         | 181 +++++++++++++++++
 .../InfluxDBDataPointDeserializer.java             |  45 +++++
 .../influxdb/source/split/InfluxDBSplit.java       |  42 ++++
 .../source/split/InfluxDBSplitSerializer.java      |  50 +++++
 .../influxdb/InfluxDBSinkIntegrationTestCase.java  | 166 ++++++++++++++++
 .../InfluxDBSourceIntegrationTestCase.java         | 220 +++++++++++++++++++++
 .../influxdb/common/InfluxParserTest.java          | 154 +++++++++++++++
 .../influxdb/sink/InfluxDBSinkBuilderTest.java     | 132 +++++++++++++
 .../influxdb/source/InfluxDBSourceBuilderTest.java |  33 ++++
 .../influxdb/util/InfluxDBContainer.java           | 108 ++++++++++
 .../influxdb/util/InfluxDBTestDeserializer.java    |  29 +++
 .../influxdb/util/InfluxDBTestSerializer.java      |  34 ++++
 .../src/test/resources/influx-setup.sh             |  25 +++
 .../src/test/resources/log4j2-test.properties      |  46 +++++
 pom.xml                                            |   1 +
 40 files changed, 3446 insertions(+)

diff --git a/flink-connector-influxdb2/README.md b/flink-connector-influxdb2/README.md
new file mode 100644
index 0000000..2a9d9e5
--- /dev/null
+++ b/flink-connector-influxdb2/README.md
@@ -0,0 +1,207 @@
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The Source implements the unified [Data Source API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html). Our sink implements the unified [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:Unif [...]
+
+The InfluxDB Source serves as an output target for [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) (and compatible tools). Telegraf pushes data to the source. The process is push-based, so it is a stateless (non-replayable) source.
+
+![Flink InfluxDB Connector Architecture](media/connector-architecture.png)
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to shade them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more information [here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). One HTTP server per source instance is started. It parses HTTP requests to our Data Point class. That Data Point instance is deserialized by a user-provided implementation of our InfluxDBDataPointDeserializer and sent to the next Flink operator.
+
+When using Telegraf, use its [HTTP output plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http):
+
+```toml
+[[outputs.http]]
+  url = "http://task-manager:8000/api/v2/write"
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java
+InfluxDBSource<Long> influxDBSource = InfluxBSource.builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the source's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       |
+| ------------- |:-------------:|
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |
+|    String     | ✅            |
+|    Boolean    | ✅            |
+
+See InfluxDB field set value [data type](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set).
+The parsing limitation is related to the Apache Druid project. For more information see this [issue](https://github.com/apache/druid/issues/10993)
+
+
+## Sink
+
+The Sink writes data points to InfluxDB using the [InfluxDB Java Client](https://github.com/influxdata/influxdb-client-java). You provide the connection information (URL, username, password, bucket, and organization) and an implementation of `InfluxDBSchemaSerializer<IN>` generic interface. The implementation of the interface overrides the `serialize(IN element, Context context)` function. This function serializes incoming Flink elements of type `IN` to [Point](https://github.com/influxd [...]
+
+It is possible to write multiple data points to InfluxDB simultaneously by separating each point with a new line. Batching data points in this manner results in much higher performance. The batch size can be set through the `WRITE_BUFFER_SIZE` option. By default, the buffer size is set to 1000 and can be changed to any value using the `setWriteBufferSize(final int bufferSize)` of the Sink builder class.
+
+It is possible to write checkpoint data points to InfluxDB whenever Flink sets a checkpoint. To enable this functionality, you need to set the `WRITE_DATA_POINT_CHECKPOINT` flag to true (default is false). The checkpoint data point looks as follow:
+```text
+checkpoint checkpoint=flink <timestamp>
+```
+The timestamp refers to the latest element that Flink serializes.
+
+### Usage
+
+```java
+// The InfluxDB Sink uses the build pattern to create a Sink object
+InfluxDBSink<Long> influxDBSink = InfluxDBSink.builder()
+        .setInfluxDBSchemaSerializer(new TestSerializer())
+        .setInfluxDBUrl(getUrl())           // http://localhost:8086
+        .setInfluxDBUsername(getUsername()) // admin
+        .setInfluxDBPassword(getPassword()) // admin
+        .setInfluxDBBucket(getBucket())     // default
+        .setInfluxDBOrganization(getOrg())  // influxdata
+        .build();
+
+// ...
+
+/**
+ * Implementation of InfluxDBSchemaSerializer interface
+ * (element) -----> (dataPoint)
+ *  1L -----------> test,longValue=1 fieldKey="fieldValue"
+ *  2L -----------> test,longValue=2 fieldKey="fieldValue"
+ *  3L -----------> test,longValue=3 fieldKey="fieldValue"
+ */
+class TestSerializer implements InfluxDBSchemaSerializer<Long> {
+
+    @Override
+    public Point serialize(Long element, Context context) {
+        final Point dataPoint = new Point("test");
+        dataPoint.addTag("longValue", String.valueOf(element));
+        dataPoint.addField("fieldKey", "fieldValue");
+        return dataPoint;
+    }
+}
+```
+
+### Options
+
+| Option            | Description   | Default Value   |
+| ----------------- |-----------------|:-----------------:|
+| WRITE_DATA_POINT_CHECKPOINT | Determines if the checkpoint data point should be written to InfluxDB or not. | false |
+| WRITE_BUFFER_SIZE | Number of elements to buffer the data before writing them to InfluxDB. | 1000 |
+| INFLUXDB_URL | InfluxDB Connection URL. | ❌ |
+| INFLUXDB_USERNAME | InfluxDB username. | ❌ |
+| INFLUXDB_PASSWORD | InfluxDB password. | ❌ |
+| INFLUXDB_BUCKET | InfluxDB bucket. | ❌ |
+| INFLUXDB_ORGANIZATION | InfluxDB organization. | ❌ |
+
+## Building the connector
+
+The connector can be built by using maven:
+
+```bash
+mvn clean install -DskipTests -pl flink-connector-influxdb2
+```
+
+## Benchmarks
+
+Some basic benchmarks were conducted.
+
+### Source
+A data generator that sends line protocol in form of HTTP requests to an REST endpoint was used for the source benchmarks.
+Throughput and latency was measured for a direct connection between the data generator and the InfluxDB source.
+A setup including Telegraf was used to benchmark the latency in a more realistic setup.
+
+### Sink
+The from sequence source was used to generate data for the sink benchmark.
+Throughput was measured without any other Flink operators, whereas the latency was measured by adding a timestamp to the event using a map operator before the sink.
+This timestamp was then compared to the insertion timestamp set by InfluxDB itself.
+
+### Visualization
+
+The results of these benchmarks are visualized [here](https://docs.google.com/presentation/d/1apd_wys0OzaiifAisABFg4B7HCydbkZXpN0OFd6cjEg/edit?usp=sharing).
+
+
+## Usage and Deployment Example
+
+See [`Shark/flink-connector-influxdb-example`](https://github.com/Shark/flink-connector-influxdb-example) for an example showing you how to use and deploy the InfluxDB source and sink connectors in a Flink application on a Kubernetes cluster.
+
+## Future Work
+
+* [Source] Dynamic (unprivileged) ports for HTTP server
+* [Source] Integration with Kubernetes service discovery in conjunction with dynamic ports
+* [Source] Use multi-threaded HTTP server
+* [Sink] Flush write buffer after an inactivity timeout
+
+## Contributors
+
+<!-- ALL-CONTRIBUTORS-LIST:START - Do not remove or modify this section -->
+<table>
+  <tr class="noBorder">
+    <td class="noBorder" align="center">
+        <a href="https://github.com/1p4pk"><img class="roundImg"
+         src="https://avatars.githubusercontent.com/u/32157576?v=4?s=100"width="100px;"/><br /><sub><b>Leon Papke</b></sub>
+         </a>
+     </td>
+    <td class="noBorder" align="center">
+        <a href="https://github.com/raminqaf"><img class="roundImg" src="https://avatars.githubusercontent.com/u/20357405?v=4?s=100" width="100px;"/><br /><sub><b>Ramin Gharib</b></sub>
+        </a>
+    </td>
+    <td  class="noBorder" align="center">
+    <a href="https://github.com/Shark"><img class="roundImg"  src="https://avatars.githubusercontent.com/u/53632?v=4?s=100" width="100px;" alt=""/>        <br /><sub><b>Felix Seidel</b></sub></a>
+    </td>
+  </tr>
+</table>
+<!-- ALL-CONTRIBUTORS-LIST:END -->
+
+This project follows the [all-contributors](https://github.com/all-contributors/all-contributors) specification. Contributions of any kind welcome!
diff --git a/flink-connector-influxdb2/media/connector-architecture.png b/flink-connector-influxdb2/media/connector-architecture.png
new file mode 100644
index 0000000..c89fdda
Binary files /dev/null and b/flink-connector-influxdb2/media/connector-architecture.png differ
diff --git a/flink-connector-influxdb2/media/source-architecture.png b/flink-connector-influxdb2/media/source-architecture.png
new file mode 100644
index 0000000..9f402e6
Binary files /dev/null and b/flink-connector-influxdb2/media/source-architecture.png differ
diff --git a/flink-connector-influxdb2/pom.xml b/flink-connector-influxdb2/pom.xml
new file mode 100644
index 0000000..5d114bf
--- /dev/null
+++ b/flink-connector-influxdb2/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-influxdb2_2.11</artifactId>
+  <name>flink-connector-influxdb2</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <influxdbClient.version>2.0.0</influxdbClient.version>
+    <druid.version>0.13.0-incubating</druid.version>
+    <!--  Test Properties  -->
+    <testcontainers.version>1.15.2</testcontainers.version>
+    <google.http.client.version>1.39.0</google.http.client.version>
+  </properties>
+
+  <dependencies>
+
+    <!-- Flink  -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-base</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <!-- InfluxDB  -->
+
+    <dependency>
+      <groupId>com.influxdb</groupId>
+      <artifactId>influxdb-client-java</artifactId>
+      <version>${influxdbClient.version}</version>
+    </dependency>
+
+    <!-- InfluxDB Line Protocol Parser by Apache Druid -->
+
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-influx-extensions</artifactId>
+      <version>${druid.version}</version>
+    </dependency>
+
+    <!-- Flink Test Utils -->
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <!-- Test container -->
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>junit-jupiter</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>influxdb</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google.http.client.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
new file mode 100644
index 0000000..3539881
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import com.influxdb.Arguments;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+/**
+ * InfluxDB data point class.
+ *
+ * <h3>Elements of line protocol</h3>
+ *
+ * <pre>
+ *
+ * measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
+ * --------------- --------------- --------------------- -------------------
+ *      |               |                   |                     |
+ * Measurement       Tag set            Field set              Timestamp
+ *
+ * </pre>
+ *
+ * <p>{@link InfluxParser} parses line protocol into this data point representation.
+ */
+public final class DataPoint {
+
+    private final String measurement;
+    private final Map<String, String> tags = new HashMap();
+    private final Map<String, Object> fields = new HashMap();
+    private final Long timestamp;
+
+    DataPoint(final String measurementName, @Nullable final Long timestamp) {
+        Arguments.checkNotNull(measurementName, "measurement");
+        this.measurement = measurementName;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Converts the DataPoint object to {@link Point} object. The default precision for timestamps
+     * is in nanoseconds. For more information about timestamp precision please go to <a
+     * href=https://docs.influxdata.com/influxdb/cloud/write-data/#timestamp-precision>timestamp-precision</a>
+     *
+     * @return {@link Point}.
+     */
+    public Point toPoint() {
+        final Point point = new Point(this.measurement);
+        point.time(this.timestamp, WritePrecision.NS);
+        point.addTags(this.tags);
+        point.addFields(this.fields);
+        return point;
+    }
+
+    /**
+     * Adds key and value to field set.
+     *
+     * @param field Key of field.
+     * @param value Value for the field key.
+     */
+    public void addField(final String field, final Object value) {
+        Arguments.checkNonEmpty(field, "fieldName");
+        this.fields.put(field, value);
+    }
+
+    /**
+     * Gets value for a specific field.
+     *
+     * @param field Key of field.
+     * @return value Value for the field key.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T getField(final String field) {
+        Arguments.checkNonEmpty(field, "fieldName");
+        return (T) this.fields.getOrDefault(field, null);
+    }
+
+    /**
+     * Adds key and value to tag set.
+     *
+     * @param key Key of tag.
+     * @param value Value for the tag key.
+     */
+    public void addTag(final String key, final String value) {
+        Arguments.checkNotNull(key, "tagName");
+        this.tags.put(key, value);
+    }
+
+    /**
+     * Gets value for a specific tag.
+     *
+     * @param key Key of tag.
+     * @return value Value for the tag key.
+     */
+    public String getTag(final String key) {
+        Arguments.checkNotNull(key, "tagName");
+        return this.tags.getOrDefault(key, null);
+    }
+
+    public Long getTimestamp() {
+        return this.timestamp;
+    }
+
+    /**
+     * A point is uniquely identified by the measurement name, tag set, and timestamp. If you submit
+     * line protocol with the same measurement, tag set, and timestamp, but with a different field
+     * set, the field set becomes the union of the old field set and the new field set, where any
+     * conflicts favor the new field set.
+     *
+     * @see <a
+     *     href="https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#duplicate-points">
+     *     Duplicate points </a>
+     * @param obj: Object to compare to
+     * @return Either the object is equal to the data point or not
+     */
+    @Override
+    public boolean equals(final Object obj) {
+
+        // If the object is compared with itself then return true
+        if (obj == this) {
+            return true;
+        }
+
+        /* Check if o is an instance of Complex or not
+        "null instanceof [type]" also returns false */
+        if (!(obj instanceof DataPoint)) {
+            return false;
+        }
+
+        // typecast o to DataPoint so that we can compare data members
+        final DataPoint point = (DataPoint) obj;
+
+        return point.measurement.equals(this.measurement)
+                && point.tags.equals(this.tags)
+                && (point.timestamp.equals(this.timestamp));
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.measurement, this.fields, this.timestamp);
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
new file mode 100644
index 0000000..fd91046
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.druid.data.input.influx.InfluxLineProtocolLexer;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
+import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This is an InfluxDB line protocol parser.
+ *
+ * @see <a href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/">Line
+ *     Protocol</a>
+ * This class contains code copied from the <a
+ *     href=https://github.com/apache/druid/blob/master/extensions-contrib/influx-extensions/src/main/java/org/apache/druid/data/input/influx/InfluxParser.java>
+ *     Apache Druid InfluxDB Parser </a>, licensed under the Apache License, Version 2.0.
+ */
+@Internal
+public final class InfluxParser {
+    private static final Pattern BACKSLASH_PATTERN = Pattern.compile("\\\\\"");
+    private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("\\\\([,= ])");
+
+    private InfluxParser() {}
+
+    public static DataPoint parseToDataPoint(final String input) throws ParseException {
+        final CharStream charStream = new ANTLRInputStream(input);
+        final InfluxLineProtocolLexer lexer = new InfluxLineProtocolLexer(charStream);
+        final TokenStream tokenStream = new CommonTokenStream(lexer);
+        final InfluxLineProtocolParser parser = new InfluxLineProtocolParser(tokenStream);
+
+        final List<InfluxLineProtocolParser.LineContext> lines = parser.lines().line();
+        if (parser.getNumberOfSyntaxErrors() != 0) {
+            throw new ParseException("Unable to parse line.", 0);
+        }
+        if (lines.size() != 1) {
+            throw new ParseException(
+                    "Multiple lines present; unable to parse more than one per record.", 0);
+        }
+
+        final InfluxLineProtocolParser.LineContext line = lines.get(0);
+        final String measurement = parseIdentifier(line.identifier());
+
+        final Long timestamp = parseTimestamp(line.timestamp());
+
+        final DataPoint out = new DataPoint(measurement, timestamp);
+
+        if (line.tag_set() != null) {
+            line.tag_set().tag_pair().forEach(t -> parseTag(t, out));
+        }
+
+        line.field_set().field_pair().forEach(t -> parseField(t, out));
+
+        return out;
+    }
+
+    private static void parseTag(
+            final InfluxLineProtocolParser.Tag_pairContext tag, final DataPoint out) {
+        final String key = parseIdentifier(tag.identifier(0));
+        final String value = parseIdentifier(tag.identifier(1));
+        out.addTag(key, value);
+    }
+
+    private static void parseField(
+            final InfluxLineProtocolParser.Field_pairContext field, final DataPoint out) {
+        final String key = parseIdentifier(field.identifier());
+        final InfluxLineProtocolParser.Field_valueContext valueContext = field.field_value();
+        final Object value;
+        if (valueContext.NUMBER() != null) {
+            value = parseNumber(valueContext.NUMBER().getText());
+        } else if (valueContext.BOOLEAN() != null) {
+            value = parseBool(valueContext.BOOLEAN().getText());
+        } else {
+            value = parseQuotedString(valueContext.QUOTED_STRING().getText());
+        }
+        out.addField(key, value);
+    }
+
+    private static String parseQuotedString(final String text) {
+        return BACKSLASH_PATTERN.matcher(text.substring(1, text.length() - 1)).replaceAll("\"");
+    }
+
+    private static Number parseNumber(final String raw) {
+        if (raw.endsWith("i")) {
+            return Long.valueOf(raw.substring(0, raw.length() - 1));
+        }
+
+        return Double.valueOf(raw);
+    }
+
+    private static Boolean parseBool(final String raw) {
+        final char first = raw.charAt(0);
+        return (first == 't' || first == 'T');
+    }
+
+    private static String parseIdentifier(final InfluxLineProtocolParser.IdentifierContext ctx) {
+        if (ctx.BOOLEAN() != null || ctx.NUMBER() != null) {
+            return ctx.getText();
+        }
+
+        return IDENTIFIER_PATTERN.matcher(ctx.IDENTIFIER_STRING().getText()).replaceAll("$1");
+    }
+
+    private static Long parseTimestamp(@Nullable final TimestampContext timestamp) {
+        if (timestamp == null) {
+            return null;
+        }
+
+        final String strTimestamp = timestamp.getText();
+        // Influx timestamps come in nanoseconds; treat anything less than 1 ms as 0
+        if (strTimestamp.length() < 7) {
+            return 0L;
+        } else {
+            return Long.valueOf(strTimestamp);
+        }
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
new file mode 100644
index 0000000..1b6df26
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import com.influxdb.client.write.Point;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.commiter.InfluxDBCommittableSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.commiter.InfluxDBCommitter;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBPointSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+
+/**
+ * This Sink implementation of InfluxDB/Line Protocol. Please use a {@link InfluxDBSinkBuilder} to
+ * construct a {@link InfluxDBSink}. The following example shows how to create an InfluxDBSink
+ * having records of <code>Long</code> as input type.
+ *
+ * <pre>{@code
+ * InfluxDBSink<Long> influxDBSink = InfluxDBSink.builder()
+ * .setInfluxDBSchemaSerializer(new InfluxDBSerializer())
+ * .setInfluxDBUrl(getUrl())
+ * .setInfluxDBUsername(getUsername())
+ * .setInfluxDBPassword(getPassword())
+ * .setInfluxDBBucket(getBucket())
+ * .setInfluxDBOrganization(getOrg())
+ * .build();
+ * }</pre>
+ *
+ * <p>See {@link InfluxDBSinkBuilder} for more details.
+ *
+ * @param <IN> type of the input of the sink.
+ */
+public final class InfluxDBSink<IN> implements Sink<IN, Long, Point, Void> {
+
+    private final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Configuration configuration;
+
+    InfluxDBSink(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer,
+            final Configuration configuration) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Get a influxDBSinkBuilder to build a {@link InfluxDBSink}.
+     *
+     * @return a InfluxDB sink builder.
+     */
+    public static <IN> InfluxDBSinkBuilder<IN> builder() {
+        return new InfluxDBSinkBuilder<>();
+    }
+
+    @Override
+    public SinkWriter<IN, Long, Point> createWriter(
+            final InitContext initContext, final List<Point> list) {
+        final InfluxDBWriter<IN> writer =
+                new InfluxDBWriter<>(this.influxDBSchemaSerializer, this.configuration);
+        writer.setProcessingTimerService(initContext.getProcessingTimeService());
+        return writer;
+    }
+
+    @Override
+    public Optional<Committer<Long>> createCommitter() {
+        return Optional.of(new InfluxDBCommitter(this.configuration));
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Long>> getCommittableSerializer() {
+        return Optional.of(new InfluxDBCommittableSerializer());
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Point>> getWriterStateSerializer() {
+        return Optional.of(new InfluxDBPointSerializer());
+    }
+
+    @Override
+    public Optional<GlobalCommitter<Long, Void>> createGlobalCommitter() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+        return Optional.empty();
+    }
+
+    public Configuration getConfiguration() {
+        return this.configuration;
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
new file mode 100644
index 0000000..70e3bbe
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+
+/**
+ * The @builder class for {@link InfluxDBSink} to make it easier for the users to construct a {@link
+ * InfluxDBSink}.
+ *
+ * <p>The following example shows the minimum setup to create a InfluxDBSink that uses the Long
+ * values from a former operator and sends it to an InfluxDB instance.
+ *
+ * <pre>{@code
+ * InfluxDBSink<Long> influxDBSink = InfluxDBSink.builder()
+ * .setInfluxDBSchemaSerializer(new InfluxDBSerializer())
+ * .setInfluxDBUrl(getUrl())
+ * .setInfluxDBUsername(getUsername())
+ * .setInfluxDBPassword(getPassword())
+ * .setInfluxDBBucket(getBucket())
+ * .setInfluxDBOrganization(getOrg())
+ * .build();
+ * }</pre>
+ *
+ * <p>To specify the batch size that has a significant influence on performance, one can call {@link
+ * #setWriteBufferSize(int)}.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the settings to build a
+ * InfluxDBSource.
+ */
+public final class InfluxDBSinkBuilder<IN> {
+    private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private String influxDBUrl;
+    private String influxDBUsername;
+    private String influxDBPassword;
+    private String bucketName;
+    private String organizationName;
+    private final Configuration configuration;
+
+    InfluxDBSinkBuilder() {
+        this.influxDBUrl = null;
+        this.influxDBUsername = null;
+        this.influxDBPassword = null;
+        this.bucketName = null;
+        this.organizationName = null;
+        this.influxDBSchemaSerializer = null;
+        this.configuration = new Configuration();
+    }
+
+    /**
+     * Sets the InfluxDB url.
+     *
+     * @param influxDBUrl the url of the InfluxDB instance to send data to.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
+        this.influxDBUrl = influxDBUrl;
+        this.configuration.setString(INFLUXDB_URL, checkNotNull(influxDBUrl));
+        return this;
+    }
+
+    /**
+     * Sets the InfluxDB user name.
+     *
+     * @param influxDBUsername the user name of the InfluxDB instance.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUsername) {
+        this.influxDBUsername = influxDBUsername;
+        this.configuration.setString(INFLUXDB_USERNAME, checkNotNull(influxDBUsername));
+        return this;
+    }
+
+    /**
+     * Sets the InfluxDB password.
+     *
+     * @param influxDBPassword the password of the InfluxDB instance.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBPassword) {
+        this.influxDBPassword = influxDBPassword;
+        this.configuration.setString(INFLUXDB_PASSWORD, checkNotNull(influxDBPassword));
+        return this;
+    }
+
+    /**
+     * Sets the InfluxDB bucket name.
+     *
+     * @param bucketName the bucket name of the InfluxDB instance to store the data in.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String bucketName) {
+        this.bucketName = bucketName;
+        this.configuration.setString(INFLUXDB_BUCKET, checkNotNull(bucketName));
+        return this;
+    }
+
+    /**
+     * Sets the InfluxDB organization name.
+     *
+     * @param organizationName the organization name of the InfluxDB instance.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String organizationName) {
+        this.organizationName = organizationName;
+        this.configuration.setString(INFLUXDB_ORGANIZATION, checkNotNull(organizationName));
+        return this;
+    }
+
+    /**
+     * Sets the {@link InfluxDBSchemaSerializer serializer} of the input type IN for the
+     * InfluxDBSink.
+     *
+     * @param influxDBSchemaSerializer the serializer for the input type.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public <T extends IN> InfluxDBSinkBuilder<T> setInfluxDBSchemaSerializer(
+            final InfluxDBSchemaSerializer<T> influxDBSchemaSerializer) {
+        checkNotNull(influxDBSchemaSerializer);
+        final InfluxDBSinkBuilder<T> sinkBuilder = (InfluxDBSinkBuilder<T>) this;
+        sinkBuilder.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        return sinkBuilder;
+    }
+
+    /**
+     * Sets if the InfluxDBSink should write checkpoint data points to InfluxDB.
+     *
+     * @param shouldWrite boolean if checkpoint should be written.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> addCheckpointDataPoint(final boolean shouldWrite) {
+        this.configuration.setBoolean(WRITE_DATA_POINT_CHECKPOINT, shouldWrite);
+        return this;
+    }
+
+    /**
+     * Sets the buffer size of the {@link InfluxDBWriter}. This also determines the number of {@link
+     * com.influxdb.client.write.Point} send to the InfluxDB instance per request.
+     *
+     * @param bufferSize size of the buffer.
+     * @return this InfluxDBSinkBuilder.
+     */
+    public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {
+        if (bufferSize <= 0) {
+            throw new IllegalArgumentException("The buffer size should be greater than 0.");
+        }
+        this.configuration.setInteger(WRITE_BUFFER_SIZE, bufferSize);
+        return this;
+    }
+
+    /**
+     * Build the {@link InfluxDBSink}.
+     *
+     * @return a InfluxDBSink with the settings made for this builder.
+     */
+    public InfluxDBSink<IN> build() {
+        this.sanityCheck();
+        return new InfluxDBSink<>(this.influxDBSchemaSerializer, this.configuration);
+    }
+
+    // ------------- private helpers  --------------
+
+    /** Checks if the SchemaSerializer and the influxDBConfig are not null and set. */
+    private void sanityCheck() {
+        // Check required settings.
+        checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
+        checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided.");
+        checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided.");
+        checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
+        checkNotNull(this.organizationName, "The Organization name is required but not provided.");
+        checkNotNull(
+                this.influxDBSchemaSerializer,
+                "Serialization schema is required but not provided.");
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
new file mode 100644
index 0000000..97ff44e
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkOptions.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.InfluxDBClientOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+public final class InfluxDBSinkOptions {
+
+    private InfluxDBSinkOptions() {}
+
+    public static final ConfigOption<Boolean> WRITE_DATA_POINT_CHECKPOINT =
+            ConfigOptions.key("sink.influxDB.write.data_point.checkpoint")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Determines if the checkpoint data point should be written to InfluxDB or not.");
+
+    public static final ConfigOption<Integer> WRITE_BUFFER_SIZE =
+            ConfigOptions.key("sink.influxDB.write.buffer.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Size of the buffer to store the data before writing to InfluxDB.");
+
+    public static final ConfigOption<String> INFLUXDB_URL =
+            ConfigOptions.key("sink.influxDB.client.URL")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB Connection URL.");
+
+    public static final ConfigOption<String> INFLUXDB_USERNAME =
+            ConfigOptions.key("sink.influxDB.client.username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB username.");
+
+    public static final ConfigOption<String> INFLUXDB_PASSWORD =
+            ConfigOptions.key("sink.influxDB.client.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB password.");
+
+    public static final ConfigOption<String> INFLUXDB_BUCKET =
+            ConfigOptions.key("sink.influxDB.client.bucket")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB bucket name.");
+
+    public static final ConfigOption<String> INFLUXDB_ORGANIZATION =
+            ConfigOptions.key("sink.influxDB.client.organization")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB organization name.");
+
+    public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
+        final String url = configuration.getString(INFLUXDB_URL);
+        final String username = configuration.getString(INFLUXDB_USERNAME);
+        final String password = configuration.getString(INFLUXDB_PASSWORD);
+        final String bucket = configuration.getString(INFLUXDB_BUCKET);
+        final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
+        final InfluxDBClientOptions influxDBClientOptions =
+                InfluxDBClientOptions.builder()
+                        .url(url)
+                        .authenticate(username, password.toCharArray())
+                        .bucket(bucket)
+                        .org(organization)
+                        .build();
+        return InfluxDBClientFactory.create(influxDBClientOptions);
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
new file mode 100644
index 0000000..9f11662
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import java.nio.ByteBuffer;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * This class Serialize and deserializes the commit values. Since we are sending the timestamp value
+ * as a committable the Long object is (de)serialized.
+ */
+@Internal
+public final class InfluxDBCommittableSerializer implements SimpleVersionedSerializer<Long> {
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(final Long timestamp) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(0, timestamp);
+        return buffer.array();
+    }
+
+    @Override
+    public Long deserialize(final int version, final byte[] serialized) {
+        final ByteBuffer buffer = ByteBuffer.wrap(serialized);
+        return buffer.getLong();
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
new file mode 100644
index 0000000..3872eed
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
+ * whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
+ * point in InfluxDB. The checkpoint data point uses the latest written record timestamp.
+ */
+@Internal
+public final class InfluxDBCommitter implements Committer<Long> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InfluxDBCommitter.class);
+
+    private final InfluxDBClient influxDBClient;
+    private final boolean writeCheckpoint;
+
+    public InfluxDBCommitter(final Configuration configuration) {
+        this.influxDBClient = getInfluxDBClient(configuration);
+        this.writeCheckpoint = configuration.getBoolean(WRITE_DATA_POINT_CHECKPOINT);
+    }
+
+    /**
+     * This method is called only when a checkpoint is set and writes a checkpoint data point into
+     * InfluxDB. The {@link
+     * org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter} prepares the
+     * commit and fills the commitable list with the latest timestamp. If the list contains a single
+     * element it will be used as the timestamp of the datapoint. Otherwise when no timestamp is
+     * provided, InfluxDB will use the current timestamp (UTC) of the host machine.
+     *
+     * <p>
+     *
+     * @param committables Contains the latest written timestamp.
+     * @return Empty list
+     * @see <a
+     *     href=https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#timestamp></a>
+     */
+    @Override
+    public List<Long> commit(final List<Long> committables) {
+        if (this.writeCheckpoint) {
+            LOG.debug("A checkpoint is set.");
+            Optional<Long> lastTimestamp = Optional.empty();
+            if (committables.size() >= 1) {
+                lastTimestamp = Optional.ofNullable(committables.get(committables.size() - 1));
+            }
+            this.writeCheckpointDataPoint(lastTimestamp);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void close() {
+        this.influxDBClient.close();
+        LOG.debug("Closing the committer.");
+    }
+
+    private void writeCheckpointDataPoint(final Optional<Long> timestamp) {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
+            final Point point = new Point("checkpoint");
+            point.addField("checkpoint", "flink");
+            timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.NS));
+            writeApi.writePoint(point);
+            LOG.debug("Checkpoint data point write at {}", point.toLineProtocol());
+        }
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
new file mode 100644
index 0000000..4be9b7f
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import com.influxdb.client.write.Point;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
+@Internal
+public final class InfluxDBPointSerializer implements SimpleVersionedSerializer<Point> {
+
+    private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(final Point point) {
+        final byte[] serialized = point.toLineProtocol().getBytes(CHARSET);
+        final byte[] targetBytes = new byte[Integer.BYTES + serialized.length];
+
+        final ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+        bb.putInt(serialized.length);
+        bb.put(serialized);
+        return targetBytes;
+    }
+
+    @Override
+    public Point deserialize(final int version, final byte[] serialized) throws IOException {
+        final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+        final byte[] targetStringBytes = new byte[bb.getInt()];
+        bb.get(targetStringBytes);
+        final String line = new String(targetStringBytes, CHARSET);
+        try {
+            return InfluxParser.parseToDataPoint(line).toPoint();
+        } catch (final ParseException exception) {
+            throw new IOException("An parse exception occurred during parsing the line", exception);
+        }
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
new file mode 100644
index 0000000..dff4a65
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import com.influxdb.client.write.Point;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+
+public interface InfluxDBSchemaSerializer<IN> extends Serializable {
+
+    /**
+     * Serializes input into a InfluxDB point.
+     *
+     * @param element to serialize.
+     * @throws IOException if the serialization failed.
+     */
+    Point serialize(final IN element, final Context context) throws IOException;
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
new file mode 100644
index 0000000..e800c86
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.write.Point;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
+ * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
+ * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
+ * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
+ * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
+ * elements.
+ *
+ * @param <IN> Type of the input
+ * @see WriteApi
+ */
+@Internal
+public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InfluxDBWriter.class);
+
+    private final int bufferSize;
+    private final boolean writeCheckpoint;
+    private long lastTimestamp = 0;
+    private final List<Point> elements;
+    private ProcessingTimeService processingTimerService;
+    private final InfluxDBSchemaSerializer<IN> schemaSerializer;
+    private final InfluxDBClient influxDBClient;
+
+    public InfluxDBWriter(
+            final InfluxDBSchemaSerializer<IN> schemaSerializer,
+            final Configuration configuration) {
+        this.schemaSerializer = schemaSerializer;
+        this.bufferSize = configuration.getInteger(WRITE_BUFFER_SIZE);
+        this.elements = new ArrayList<>(this.bufferSize);
+        this.writeCheckpoint = configuration.getBoolean(WRITE_DATA_POINT_CHECKPOINT);
+        this.influxDBClient = getInfluxDBClient(configuration);
+    }
+
+    /**
+     * This method calls the InfluxDB write API whenever the element list reaches the {@link
+     * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
+     * timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
+     *
+     * @param in incoming data
+     * @param context current Flink context
+     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     */
+    @Override
+    public void write(final IN in, final Context context) throws IOException {
+        if (this.elements.size() == this.bufferSize) {
+            LOG.debug("Buffer size reached preparing to write the elements.");
+            this.writeCurrentElements();
+            this.elements.clear();
+        } else {
+            LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
+            this.elements.add(this.schemaSerializer.serialize(in, context));
+            if (context.timestamp() != null) {
+                this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
+            }
+        }
+    }
+
+    /**
+     * This method is called whenever a checkpoint is set by Flink. It creates a list and fills it
+     * up with the latest timestamp.
+     *
+     * @param flush
+     * @return A list containing 0 or 1 element
+     */
+    @Override
+    public List<Long> prepareCommit(final boolean flush) {
+        if (this.lastTimestamp == 0) {
+            return Collections.emptyList();
+        }
+        final List<Long> lastTimestamp = new ArrayList<>(1);
+        lastTimestamp.add(this.lastTimestamp);
+        return lastTimestamp;
+    }
+
+    @Override
+    public List<Point> snapshotState() {
+        return this.elements;
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.debug("Preparing to write the elements in InfluxDB.");
+        this.writeCurrentElements();
+        LOG.debug("Closing the writer.");
+        this.elements.clear();
+    }
+
+    public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
+        this.processingTimerService = processingTimerService;
+    }
+
+    private void writeCurrentElements() {
+        try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
+            writeApi.writePoints(this.elements);
+            LOG.debug("Wrote {} data points", this.elements.size());
+        }
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
new file mode 100644
index 0000000..fb55eef
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import java.util.function.Supplier;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumState;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSourceEnumStateSerializer;
+import org.apache.flink.streaming.connectors.influxdb.source.enumerator.InfluxDBSplitEnumerator;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBRecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSourceReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBSplitReader;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplitSerializer;
+
+/**
+ * The Source implementation of InfluxDB. Please use a {@link InfluxDBSourceBuilder} to construct a
+ * {@link InfluxDBSource}. The following example shows how to create an InfluxDBSource emitting
+ * records of <code>Long</code> type.
+ *
+ * <pre>{@code
+ * InfluxDBSource<Long> influxDBSource = InfluxBSource.builder()
+ * .setDeserializer(new InfluxDBDeserializer())
+ * .build()
+ * }</pre>
+ *
+ * <p>See {@link InfluxDBSourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ */
+public final class InfluxDBSource<OUT>
+        implements Source<OUT, InfluxDBSplit, InfluxDBSourceEnumState>, ResultTypeQueryable<OUT> {
+
+    private final Configuration configuration;
+    private final InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+
+    InfluxDBSource(
+            final Configuration configuration,
+            final InfluxDBDataPointDeserializer<OUT> deserializationSchema) {
+        this.configuration = configuration;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    /**
+     * Get a influxDBSourceBuilder to build a {@link InfluxDBSource}.
+     *
+     * @return a InfluxDB source builder.
+     */
+    public static <OUT> InfluxDBSourceBuilder<OUT> builder() {
+        return new InfluxDBSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, InfluxDBSplit> createReader(
+            final SourceReaderContext sourceReaderContext) {
+        final Supplier<InfluxDBSplitReader> splitReaderSupplier =
+                () -> new InfluxDBSplitReader(this.configuration);
+        final InfluxDBRecordEmitter<OUT> recordEmitter =
+                new InfluxDBRecordEmitter<>(this.deserializationSchema);
+
+        return new InfluxDBSourceReader<>(
+                splitReaderSupplier, recordEmitter, this.configuration, sourceReaderContext);
+    }
+
+    @Override
+    public SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> createEnumerator(
+            final SplitEnumeratorContext<InfluxDBSplit> splitEnumeratorContext) {
+        return new InfluxDBSplitEnumerator(splitEnumeratorContext);
+    }
+
+    @Override
+    public SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> restoreEnumerator(
+            final SplitEnumeratorContext<InfluxDBSplit> splitEnumeratorContext,
+            final InfluxDBSourceEnumState influxDBSourceEnumState) {
+        return new InfluxDBSplitEnumerator(splitEnumeratorContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<InfluxDBSplit> getSplitSerializer() {
+        return new InfluxDBSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<InfluxDBSourceEnumState> getEnumeratorCheckpointSerializer() {
+        return new InfluxDBSourceEnumStateSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return this.deserializationSchema.getProducedType();
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
new file mode 100644
index 0000000..4ff9c5c
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+/**
+ * The @builder class for {@link InfluxDBSource} to make it easier for the users to construct a
+ * {@link InfluxDBSource}.
+ *
+ * <p>The following example shows the minimum setup to create a InfluxDBSource that reads the Long
+ * values from a line protocol source.
+ *
+ * <pre>{@code
+ * InfluxDBSource<Long> influxDBSource = InfluxBSource.builder()
+ * .setDeserializer(new InfluxDBDeserializer())
+ * .build()
+ * }</pre>
+ *
+ * <p>To specify the starting port on which the InfluxDBSource starts its HTTP server, one can call
+ * {@link #setPort(int)}.
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the settings to build a
+ * InfluxDBSource.
+ */
+public final class InfluxDBSourceBuilder<OUT> {
+
+    private InfluxDBDataPointDeserializer<OUT> deserializationSchema;
+    private final Configuration configuration;
+
+    InfluxDBSourceBuilder() {
+        this.deserializationSchema = null;
+        this.configuration = new Configuration();
+    }
+
+    /**
+     * Sets the {@link InfluxDBDataPointDeserializer deserializer} of the {@link
+     * org.apache.flink.streaming.connectors.influxdb.common.DataPoint DataPoint} for the
+     * InfluxDBSource.
+     *
+     * @param dataPointDeserializer the deserializer for InfluxDB {@link
+     *     org.apache.flink.streaming.connectors.influxdb.common.DataPoint DataPoint}.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public <T extends OUT> InfluxDBSourceBuilder<T> setDeserializer(
+            final InfluxDBDataPointDeserializer<T> dataPointDeserializer) {
+        checkNotNull(dataPointDeserializer);
+        final InfluxDBSourceBuilder<T> sourceBuilder = (InfluxDBSourceBuilder<T>) this;
+        sourceBuilder.deserializationSchema = dataPointDeserializer;
+        return sourceBuilder;
+    }
+
+    /**
+     * Sets the enqueue wait time, i.e., the time out of this InfluxDBSource.
+     *
+     * @param timeOut the enqueue wait time to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setEnqueueWaitTime(final long timeOut) {
+        this.configuration.setLong(InfluxDBSourceOptions.ENQUEUE_WAIT_TIME, timeOut);
+        return this;
+    }
+
+    /**
+     * Sets the ingest queue capacity of this InfluxDBSource.
+     *
+     * @param capacity the capacity to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setIngestQueueCapacity(final int capacity) {
+        this.configuration.setInteger(InfluxDBSourceOptions.INGEST_QUEUE_CAPACITY, capacity);
+        return this;
+    }
+
+    /**
+     * Sets the maximum number of lines that should be parsed per HTTP request for this
+     * InfluxDBSource.
+     *
+     * @param max the maximum number of lines to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setMaximumLinesPerRequest(final int max) {
+        this.configuration.setInteger(InfluxDBSourceOptions.MAXIMUM_LINES_PER_REQUEST, max);
+        return this;
+    }
+
+    /**
+     * Sets the TCP port on which the split reader's HTTP server of this InfluxDBSource is running
+     * on.
+     *
+     * @param port the port to use for this InfluxDBSource.
+     * @return this InfluxDBSourceBuilder.
+     */
+    public InfluxDBSourceBuilder<OUT> setPort(final int port) {
+        this.configuration.setInteger(InfluxDBSourceOptions.PORT, port);
+        return this;
+    }
+
+    /**
+     * Build the {@link InfluxDBSource}.
+     *
+     * @return a InfluxDBSource with the settings made for this builder.
+     */
+    public InfluxDBSource<OUT> build() {
+        this.sanityCheck();
+        return new InfluxDBSource<>(this.configuration, this.deserializationSchema);
+    }
+
+    // ------------- private helpers  --------------
+
+    private void sanityCheck() {
+        checkNotNull(
+                this.deserializationSchema, "Deserialization schema is required but not provided.");
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceOptions.java
new file mode 100644
index 0000000..a7bf884
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceOptions.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/* Configurations for a InfluxDBSource. */
+public final class InfluxDBSourceOptions {
+
+    private InfluxDBSourceOptions() {}
+
+    public static final ConfigOption<Long> ENQUEUE_WAIT_TIME =
+            ConfigOptions.key("source.influxDB.timeout.enqueue")
+                    .longType()
+                    .defaultValue(5L)
+                    .withDescription(
+                            "The time out in seconds for enqueuing an HTTP request to the queue.");
+
+    public static final ConfigOption<Integer> INGEST_QUEUE_CAPACITY =
+            ConfigOptions.key("source.influxDB.queue_capacity.ingest")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Size of queue that buffers HTTP requests data points before fetching.");
+
+    public static final ConfigOption<Integer> MAXIMUM_LINES_PER_REQUEST =
+            ConfigOptions.key("source.influxDB.limit.lines_per_request")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The maximum number of lines that should be parsed per HTTP request.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("source.influxDB.port")
+                    .intType()
+                    .defaultValue(8000)
+                    .withDescription(
+                            "TCP port on which the split reader's HTTP server is running on.");
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumState.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumState.java
new file mode 100644
index 0000000..149e796
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+
+/** The state of InfluxDB source enumerator. */
+@Internal
+public final class InfluxDBSourceEnumState {}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumStateSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumStateSerializer.java
new file mode 100644
index 0000000..7214f63
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSourceEnumStateSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/** InfluxDB is stateless due to its unreplayable HTTP request source. */
+@Internal
+public final class InfluxDBSourceEnumStateSerializer
+        implements SimpleVersionedSerializer<InfluxDBSourceEnumState> {
+    @Override
+    public int getVersion() {
+        return 0;
+    }
+
+    @Override
+    public byte[] serialize(final InfluxDBSourceEnumState influxDBSourceEnumState) {
+        return new byte[0];
+    }
+
+    @Override
+    public InfluxDBSourceEnumState deserialize(final int i, final byte[] bytes) {
+        return new InfluxDBSourceEnumState();
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
new file mode 100644
index 0000000..b7a2c11
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+import org.jetbrains.annotations.Nullable;
+
+/** The enumerator class for InfluxDB source. */
+@Internal
+public final class InfluxDBSplitEnumerator
+        implements SplitEnumerator<InfluxDBSplit, InfluxDBSourceEnumState> {
+
+    private final SplitEnumeratorContext<InfluxDBSplit> context;
+
+    public InfluxDBSplitEnumerator(final SplitEnumeratorContext<InfluxDBSplit> context) {
+        this.context = checkNotNull(context);
+    }
+
+    @Override
+    public void start() {
+        // no resources to start
+    }
+
+    @Override
+    public void handleSplitRequest(final int subtaskId, @Nullable final String requesterHostname) {
+        this.context.assignSplit(new InfluxDBSplit(subtaskId), subtaskId);
+    }
+
+    @Override
+    public void addSplitsBack(final List<InfluxDBSplit> splits, final int subtaskId) {}
+
+    @Override
+    public void addReader(final int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon registration
+    }
+
+    @Override
+    public InfluxDBSourceEnumState snapshotState() {
+        return new InfluxDBSourceEnumState();
+    }
+
+    @Override
+    public void close() {}
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
new file mode 100644
index 0000000..e71b684
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.flink.annotation.Internal;
+import org.jetbrains.annotations.NotNull;
+
+/** Abstract base handle class for creating a response */
+@Internal
+abstract class Handler implements HttpHandler {
+
+    static final int HTTP_TOO_MANY_REQUESTS = 415;
+
+    static void sendResponse(
+            @NotNull final HttpExchange t, final int responseCode, @NotNull final String message)
+            throws IOException {
+        final byte[] response = message.getBytes();
+        t.sendResponseHeaders(responseCode, response.length);
+        final OutputStream os = t.getResponseBody();
+        os.write(response);
+        os.close();
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
new file mode 100644
index 0000000..ec4d45d
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Handles incoming health check requests from /health path. If the server is running a response
+ * code 200 is sent
+ */
+@Internal
+public final class HealthCheckHandler extends Handler {
+
+    @Override
+    public void handle(final HttpExchange t) throws IOException {
+        Handler.sendResponse(t, HttpURLConnection.HTTP_OK, "ready for writes");
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
new file mode 100644
index 0000000..1c2bba0
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.http;
+
+import com.sun.net.httpserver.HttpExchange;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles the incoming requests through the path /api/v2/write. The handle function
+ * reads each line in the body and uses the {@link InfluxParser} to pars them to {@link DataPoint}
+ * objects.
+ */
+@Internal
+public final class WriteAPIHandler extends Handler {
+    private static final Logger LOG = LoggerFactory.getLogger(WriteAPIHandler.class);
+
+    private final int maximumLinesPerRequest;
+    private final FutureCompletingBlockingQueue ingestionQueue;
+    private final int threadIndex;
+    private final long enqueueWaitTime;
+
+    public WriteAPIHandler(
+            final int maximumLinesPerRequest,
+            final FutureCompletingBlockingQueue ingestionQueue,
+            final int threadIndex,
+            final long enqueueWaitTime) {
+        this.maximumLinesPerRequest = maximumLinesPerRequest;
+        this.ingestionQueue = ingestionQueue;
+        this.threadIndex = threadIndex;
+        this.enqueueWaitTime = enqueueWaitTime;
+    }
+
+    @Override
+    public void handle(final HttpExchange t) throws IOException {
+        final BufferedReader in =
+                new BufferedReader(
+                        new InputStreamReader(t.getRequestBody(), StandardCharsets.UTF_8));
+
+        try {
+            String line;
+            final List<DataPoint> points = new ArrayList<>();
+            int numberOfLinesParsed = 0;
+            while ((line = in.readLine()) != null) {
+                final DataPoint dataPoint = InfluxParser.parseToDataPoint(line);
+                points.add(dataPoint);
+                numberOfLinesParsed++;
+                if (numberOfLinesParsed > this.maximumLinesPerRequest) {
+                    throw new RequestTooLargeException(
+                            String.format(
+                                    "Payload too large. Maximum number of lines per request is %d.",
+                                    this.maximumLinesPerRequest));
+                }
+            }
+
+            final boolean result =
+                    CompletableFuture.supplyAsync(
+                                    () -> {
+                                        try {
+                                            return this.ingestionQueue.put(
+                                                    this.threadIndex, points);
+                                        } catch (final InterruptedException e) {
+                                            return false;
+                                        }
+                                    })
+                            .get(this.enqueueWaitTime, TimeUnit.SECONDS);
+
+            if (!result) {
+                throw new TimeoutException("Failed to enqueue");
+            }
+
+            t.sendResponseHeaders(HttpURLConnection.HTTP_NO_CONTENT, -1);
+            this.ingestionQueue.notifyAvailable();
+        } catch (final ParseException e) {
+            Handler.sendResponse(t, HttpURLConnection.HTTP_BAD_REQUEST, e.getMessage());
+        } catch (final RequestTooLargeException e) {
+            Handler.sendResponse(t, HttpURLConnection.HTTP_ENTITY_TOO_LARGE, e.getMessage());
+        } catch (final TimeoutException e) {
+            Handler.sendResponse(t, HTTP_TOO_MANY_REQUESTS, "Server overloaded");
+            LOG.error(e.getMessage());
+        } catch (final ExecutionException | InterruptedException e) {
+            Handler.sendResponse(t, HttpURLConnection.HTTP_INTERNAL_ERROR, "Server Error");
+            LOG.error(e.getMessage());
+        }
+    }
+
+    private static class RequestTooLargeException extends RuntimeException {
+        RequestTooLargeException(final String message) {
+            super(message);
+        }
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
new file mode 100644
index 0000000..1612bc5
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+
+@Internal
+public final class InfluxDBRecordEmitter<T> implements RecordEmitter<DataPoint, T, InfluxDBSplit> {
+    private final InfluxDBDataPointDeserializer<T> dataPointDeserializer;
+
+    public InfluxDBRecordEmitter(final InfluxDBDataPointDeserializer<T> dataPointDeserializer) {
+        this.dataPointDeserializer = dataPointDeserializer;
+    }
+
+    @Override
+    public void emitRecord(
+            final DataPoint element, final SourceOutput<T> output, final InfluxDBSplit splitState)
+            throws IOException {
+        output.collect(this.dataPointDeserializer.deserialize(element), element.getTimestamp());
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
new file mode 100644
index 0000000..1b75b84
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+
+/** The source reader for the InfluxDB line protocol. */
+@Internal
+public final class InfluxDBSourceReader<OUT>
+        extends SingleThreadMultiplexSourceReaderBase<
+                DataPoint, OUT, InfluxDBSplit, InfluxDBSplit> {
+
+    public InfluxDBSourceReader(
+            final Supplier<InfluxDBSplitReader> splitReaderSupplier,
+            final RecordEmitter<DataPoint, OUT, InfluxDBSplit> recordEmitter,
+            final Configuration config,
+            final SourceReaderContext context) {
+        super(splitReaderSupplier::get, recordEmitter, config, context);
+    }
+
+    @Override
+    public void start() {
+        // we request a split only if we did not get splits during the checkpoint restore
+        if (this.getNumberOfCurrentlyAssignedSplits() == 0) {
+            this.context.sendSplitRequest();
+        }
+    }
+
+    @Override
+    protected void onSplitFinished(final Map<String, InfluxDBSplit> map) {
+        this.context.sendSplitRequest();
+    }
+
+    @Override
+    protected InfluxDBSplit initializedState(final InfluxDBSplit influxDBSplit) {
+        return influxDBSplit;
+    }
+
+    @Override
+    protected InfluxDBSplit toSplitType(final String s, final InfluxDBSplit influxDBSplitState) {
+        return influxDBSplitState;
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
new file mode 100644
index 0000000..ddfcb33
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
+
+import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.ENQUEUE_WAIT_TIME;
+import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.INGEST_QUEUE_CAPACITY;
+import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.MAXIMUM_LINES_PER_REQUEST;
+import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.PORT;
+
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.http.HealthCheckHandler;
+import org.apache.flink.streaming.connectors.influxdb.source.http.WriteAPIHandler;
+import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
+
+/**
+ * A {@link SplitReader} implementation that reads records from InfluxDB splits.
+ *
+ * <p>The returned type are in the format of {@link DataPoint}.
+ */
+@Internal
+public final class InfluxDBSplitReader implements SplitReader<DataPoint, InfluxDBSplit> {
+
+    private final long enqueueWaitTime;
+    private final int maximumLinesPerRequest;
+    private final int defaultPort;
+
+    private HttpServer server = null;
+
+    private final FutureCompletingBlockingQueue<List<DataPoint>> ingestionQueue;
+
+    private InfluxDBSplit split;
+
+    public InfluxDBSplitReader(final Configuration configuration) {
+        this.enqueueWaitTime = configuration.getLong(ENQUEUE_WAIT_TIME);
+        this.maximumLinesPerRequest = configuration.getInteger(MAXIMUM_LINES_PER_REQUEST);
+        this.defaultPort = configuration.getInteger(PORT);
+        final int capacity = configuration.getInteger(INGEST_QUEUE_CAPACITY);
+        this.ingestionQueue = new FutureCompletingBlockingQueue<>(capacity);
+    }
+
+    @Override
+    public RecordsWithSplitIds<DataPoint> fetch() throws IOException {
+        if (this.split == null) {
+            return null;
+        }
+        final InfluxDBSplitRecords recordsBySplits = new InfluxDBSplitRecords(this.split.splitId());
+
+        try {
+            this.ingestionQueue.getAvailabilityFuture().get();
+        } catch (final InterruptedException | ExecutionException exception) {
+            throw new IOException("An exception occurred during fetch", exception);
+        }
+        final List<DataPoint> requests = this.ingestionQueue.poll();
+        if (requests == null) {
+            recordsBySplits.prepareForRead();
+            return recordsBySplits;
+        }
+        recordsBySplits.addAll(requests);
+        recordsBySplits.prepareForRead();
+        return recordsBySplits;
+    }
+
+    @Override
+    public void handleSplitsChanges(final SplitsChange<InfluxDBSplit> splitsChange) {
+        if (splitsChange.splits().isEmpty()) {
+            return;
+        }
+        this.split = splitsChange.splits().get(0);
+
+        if (this.server != null) {
+            return;
+        }
+        try {
+            this.server = HttpServer.create(new InetSocketAddress(this.defaultPort), 0);
+        } catch (final IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Unable to start HTTP Server on Port %d: %s",
+                            this.defaultPort, e.getMessage()));
+        }
+
+        this.server.createContext(
+                "/api/v2/write",
+                new WriteAPIHandler(
+                        this.maximumLinesPerRequest,
+                        this.ingestionQueue,
+                        this.split.splitId().hashCode(),
+                        this.enqueueWaitTime));
+        this.server.createContext("/health", new HealthCheckHandler());
+        this.server.setExecutor(null); // creates a default executor
+        this.server.start();
+    }
+
+    @Override
+    public void wakeUp() {
+        this.ingestionQueue.notifyAvailable();
+    }
+
+    @Override
+    public void close() {
+        if (this.server != null) {
+            this.server.stop(1); // waits max 1 second for pending requests to finish
+        }
+    }
+
+    // ---------------- private helper class --------------------
+
+    private static class InfluxDBSplitRecords implements RecordsWithSplitIds<DataPoint> {
+        private final List<DataPoint> records;
+        private Iterator<DataPoint> recordIterator;
+        private final String splitId;
+
+        private InfluxDBSplitRecords(final String splitId) {
+            this.splitId = splitId;
+            this.records = new ArrayList<>();
+        }
+
+        private boolean addAll(final List<DataPoint> records) {
+            return this.records.addAll(records);
+        }
+
+        private void prepareForRead() {
+            this.recordIterator = this.records.iterator();
+        }
+
+        @Override
+        @Nullable
+        public String nextSplit() {
+            if (this.recordIterator.hasNext()) {
+                return this.splitId;
+            }
+            return null;
+        }
+
+        @Override
+        @Nullable
+        public DataPoint nextRecordFromSplit() {
+            if (this.recordIterator.hasNext()) {
+                return this.recordIterator.next();
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public Set<String> finishedSplits() {
+            return Collections.emptySet();
+        }
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
new file mode 100644
index 0000000..0773b2a
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+
+/** An interface for the deserialization of InfluxDB data points. */
+public interface InfluxDBDataPointDeserializer<OUT> extends Serializable, ResultTypeQueryable<OUT> {
+
+    /**
+     * Deserialize a data point into the given collector.
+     *
+     * @param dataPoint the {@code DataPoint} to deserialize.
+     * @throws IOException if the deserialization failed.
+     */
+    OUT deserialize(DataPoint dataPoint) throws IOException;
+
+    // static function for single data point
+
+    @Override
+    default TypeInformation<OUT> getProducedType() {
+        return TypeExtractor.createTypeInfo(
+                InfluxDBDataPointDeserializer.class, this.getClass(), 0, null, null);
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplit.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplit.java
new file mode 100644
index 0000000..afcbc4f
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplit.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
+
+/** A {@link SourceSplit} for a InfluxDB split. */
+@Internal
+public final class InfluxDBSplit implements SourceSplit {
+
+    /** The unique ID of the split. Unique within the scope of this source. */
+    private final long id;
+
+    public InfluxDBSplit(final long id) {
+        this.id = id;
+    }
+
+    @Override
+    public String splitId() {
+        return String.valueOf(this.id);
+    }
+
+    public long getId() {
+        return this.id;
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
new file mode 100644
index 0000000..4d1da83
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source.split;
+
+import java.nio.ByteBuffer;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * The {@link org.apache.flink.core.io.SimpleVersionedSerializer serializer} for {@link
+ * InfluxDBSplit}.
+ */
+@Internal
+public final class InfluxDBSplitSerializer implements SimpleVersionedSerializer<InfluxDBSplit> {
+
+    private static final int CURRENT_VERSION = 0;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(final InfluxDBSplit influxDBSplit) {
+        final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(0, influxDBSplit.getId());
+        return buffer.array();
+    }
+
+    @Override
+    public InfluxDBSplit deserialize(final int version, final byte[] serialized) {
+        final ByteBuffer buffer = ByteBuffer.wrap(serialized);
+        return new InfluxDBSplit(buffer.getLong());
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
new file mode 100644
index 0000000..75671c8
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import com.influxdb.query.FluxRecord;
+import com.influxdb.query.FluxTable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSink;
+import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
+import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+class InfluxDBSinkIntegrationTestCase extends TestLogger {
+
+    @Container
+    public static final InfluxDBContainer<?> influxDBContainer =
+            InfluxDBContainer.createWithDefaultTag();
+
+    private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
+
+    private static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
+            SOURCE_DATA.stream()
+                    .map(x -> new InfluxDBTestSerializer().serialize(x, null).toLineProtocol())
+                    .collect(Collectors.toList());
+
+    /**
+     * Test the following topology.
+     *
+     * <pre>
+     *     1L,2L,3L           "test,longValue=1 fieldKey="fieldValue"",
+     *                        "test,longValue=2 fieldKey="fieldValue"",
+     *                        "test,longValue=3 fieldKey="fieldValue""
+     *     (source2/2) -----> (sink1/1)
+     * </pre>
+     *
+     * Source collects twice and calls each time 2 checkpoint. In total 4 checkpoints are set. In
+     * some cases there are more than 4 checkpoints set.
+     */
+    @Test
+    void testSinkDataToInfluxDB() throws Exception {
+        final StreamExecutionEnvironment env = buildStreamEnv();
+
+        final InfluxDBSink<Long> influxDBSink =
+                InfluxDBSink.builder()
+                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                        .setInfluxDBUrl(influxDBContainer.getUrl())
+                        .setInfluxDBUsername(InfluxDBContainer.username)
+                        .setInfluxDBPassword(InfluxDBContainer.password)
+                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                        .addCheckpointDataPoint(true)
+                        .build();
+
+        env.addSource(new FiniteTestSource(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+                .sinkTo(influxDBSink);
+
+        env.execute();
+
+        final InfluxDBClient client = getInfluxDBClient(influxDBSink.getConfiguration());
+        final List<String> actualWrittenPoints = queryWrittenData(client);
+
+        assertEquals(actualWrittenPoints.size(), EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size());
+
+        final List<String> actualCheckpoints = queryCheckpoints(client);
+        assertTrue(actualCheckpoints.size() >= 4);
+    }
+
+    // ---------------- private helper methods --------------------
+
+    private static StreamExecutionEnvironment buildStreamEnv() {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        return env;
+    }
+
+    private static List<String> queryWrittenData(final InfluxDBClient influxDBClient) {
+        final List<String> dataPoints = new ArrayList<>();
+
+        final String query =
+                String.format(
+                        "from(bucket: \"%s\") |> "
+                                + "range(start: -1h) |> "
+                                + "filter(fn:(r) => r._measurement == \"test\")",
+                        InfluxDBContainer.bucket);
+        final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
+        for (final FluxTable table : tables) {
+            for (final FluxRecord record : table.getRecords()) {
+                dataPoints.add(recordToDataPoint(record).toLineProtocol());
+            }
+        }
+        return dataPoints;
+    }
+
+    private static List<String> queryCheckpoints(final InfluxDBClient influxDBClient) {
+        final List<String> commitDataPoints = new ArrayList<>();
+
+        final String query =
+                String.format(
+                        "from(bucket: \"%s\") |> "
+                                + "range(start: -1h) |> "
+                                + "filter(fn:(r) => r._measurement == \"checkpoint\")",
+                        InfluxDBContainer.bucket);
+
+        final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
+        for (final FluxTable table : tables) {
+            for (final FluxRecord record : table.getRecords()) {
+                commitDataPoints.add(recordToCheckpointDataPoint(record).toLineProtocol());
+            }
+        }
+        return commitDataPoints;
+    }
+
+    private static Point recordToDataPoint(final FluxRecord record) {
+        final String tagKey = "longValue";
+        final Point point = new Point(record.getMeasurement());
+        point.addTag(tagKey, String.valueOf(record.getValueByKey(tagKey)));
+        point.addField(
+                Objects.requireNonNull(record.getField()), String.valueOf(record.getValue()));
+        point.time(record.getTime(), WritePrecision.NS);
+        return point;
+    }
+
+    private static Point recordToCheckpointDataPoint(final FluxRecord record) {
+        final Point point = new Point(record.getMeasurement());
+        point.addField(
+                Objects.requireNonNull(record.getField()), String.valueOf(record.getValue()));
+        point.time(record.getTime(), WritePrecision.NS);
+        return point;
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java
new file mode 100644
index 0000000..8165533
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.api.client.http.ByteArrayContent;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.util.ExponentialBackOff;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSource;
+import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestDeserializer;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+/** Integration test for the InfluxDB source for Flink. */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class InfluxDBSourceIntegrationTestCase extends TestLogger {
+
+    private static final String HTTP_ADDRESS = "http://localhost";
+    private int port = 0;
+
+    private static final HttpRequestFactory HTTP_REQUEST_FACTORY =
+            new NetHttpTransport().createRequestFactory();
+    private static final ExponentialBackOff HTTP_BACKOFF =
+            new ExponentialBackOff.Builder()
+                    .setInitialIntervalMillis(250)
+                    .setMaxElapsedTimeMillis(10000)
+                    .setMaxIntervalMillis(1000)
+                    .setMultiplier(1.3)
+                    .setRandomizationFactor(0.5)
+                    .build();
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
+
+    @BeforeEach
+    void init() {
+        CollectSink.VALUES.clear();
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            this.port = serverSocket.getLocalPort();
+            this.log.info("Using port {} for the HTTP server", this.port);
+        } catch (final IOException ioException) {
+            this.log.error("Could not open open port {}", ioException.getMessage());
+        }
+    }
+
+    /**
+     * Test the following topology.
+     *
+     * <pre>
+     *     test longValue=1i 1     +1            2
+     *     test longValue=2i 1     +1            3
+     *     (source) ------------> (map) -----> (sink)
+     * </pre>
+     */
+    // Test is disabled since it is not passing the Travis pipeline.
+    @Test
+    @Disabled
+    void testIncrementPipeline() throws Exception {
+        final InfluxDBSource<Long> influxDBSource =
+                InfluxDBSource.builder()
+                        .setPort(this.port)
+                        .setDeserializer(new InfluxDBTestDeserializer())
+                        .build();
+
+        this.env
+                .fromSource(influxDBSource, WatermarkStrategy.noWatermarks(), "InfluxDBSource")
+                .map(new IncrementMapFunction())
+                .addSink(new CollectSink());
+
+        final JobClient jobClient = this.env.executeAsync();
+        assertTrue(this.checkHealthCheckAvailable());
+
+        final int writeResponseCode =
+                this.writeToInfluxDB("test longValue=1i 1\ntest longValue=2i 2");
+
+        assertEquals(writeResponseCode, HttpURLConnection.HTTP_NO_CONTENT);
+
+        jobClient.cancel();
+
+        final Collection<Long> results = new ArrayList<>();
+        results.add(2L);
+        results.add(3L);
+        assertTrue(CollectSink.VALUES.containsAll(results));
+    }
+
+    @Test
+    void testBadRequestException() throws Exception {
+        final InfluxDBSource<Long> influxDBSource =
+                InfluxDBSource.builder()
+                        .setPort(this.port)
+                        .setDeserializer(new InfluxDBTestDeserializer())
+                        .build();
+
+        this.env
+                .fromSource(influxDBSource, WatermarkStrategy.noWatermarks(), "InfluxDBSource")
+                .map(new IncrementMapFunction())
+                .addSink(new CollectSink());
+
+        final JobClient jobClient = this.env.executeAsync();
+        assertTrue(this.checkHealthCheckAvailable());
+        final HttpResponseException thrown =
+                Assertions.assertThrows(
+                        HttpResponseException.class,
+                        () -> this.writeToInfluxDB("malformedLineProtocol_test"));
+        assertTrue(thrown.getMessage().contains("Unable to parse line."));
+        jobClient.cancel();
+    }
+
+    @Test
+    void testRequestTooLargeException() throws Exception {
+        final InfluxDBSource<Long> influxDBSource =
+                InfluxDBSource.builder()
+                        .setPort(this.port)
+                        .setDeserializer(new InfluxDBTestDeserializer())
+                        .setMaximumLinesPerRequest(2)
+                        .build();
+        this.env
+                .fromSource(influxDBSource, WatermarkStrategy.noWatermarks(), "InfluxDBSource")
+                .map(new IncrementMapFunction())
+                .addSink(new CollectSink());
+
+        final JobClient jobClient = this.env.executeAsync();
+        assertTrue(this.checkHealthCheckAvailable());
+
+        final String lines = "test longValue=1i 1\ntest longValue=1i 1\ntest longValue=1i 1";
+        final HttpResponseException thrown =
+                Assertions.assertThrows(
+                        HttpResponseException.class, () -> this.writeToInfluxDB(lines));
+        assertTrue(
+                thrown.getMessage()
+                        .contains("Payload too large. Maximum number of lines per request is 2."));
+        jobClient.cancel();
+    }
+
+    private int writeToInfluxDB(final String line) throws IOException {
+        final HttpContent content = ByteArrayContent.fromString("text/plain; charset=utf-8", line);
+        final HttpRequest request =
+                HTTP_REQUEST_FACTORY.buildPostRequest(
+                        new GenericUrl(
+                                String.format("%s:%s/api/v2/write", HTTP_ADDRESS, this.port)),
+                        content);
+        return request.execute().getStatusCode();
+    }
+
+    private boolean checkHealthCheckAvailable() throws IOException {
+        final HttpRequest request =
+                HTTP_REQUEST_FACTORY.buildGetRequest(
+                        new GenericUrl(String.format("%s:%s/health", HTTP_ADDRESS, this.port)));
+
+        request.setUnsuccessfulResponseHandler(
+                new HttpBackOffUnsuccessfulResponseHandler(HTTP_BACKOFF));
+        request.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(HTTP_BACKOFF));
+
+        final int statusCode = request.execute().getStatusCode();
+        return statusCode == HttpURLConnection.HTTP_OK;
+    }
+
+    // ---------------- private helper class --------------------
+
+    /** Simple incrementation with map. */
+    private static class IncrementMapFunction implements MapFunction<Long, Long> {
+
+        @Override
+        public Long map(final Long record) {
+            return record + 1;
+        }
+    }
+
+    /** create a simple testing sink */
+    private static class CollectSink implements SinkFunction<Long> {
+
+        // must be static
+        public static final List<Long> VALUES = Collections.synchronizedList(new ArrayList<>());
+
+        @Override
+        public void invoke(final Long value) {
+            VALUES.add(value);
+        }
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
new file mode 100644
index 0000000..71d9529
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.common;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.text.ParseException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class InfluxParserTest {
+
+    @Test
+    void shouldParseLineWithTagAndFieldStringToPoint() throws ParseException {
+        final String lineProtocol =
+                "test,testTagKey=testTagValue testFieldKey=\"testFieldValue\" 1556813561098000000";
+        final DataPoint actualDataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        final DataPoint expectedDataPoint = new DataPoint("test", 1556813561098000000L);
+        expectedDataPoint.addTag("testTagKey", "testTagValue");
+        expectedDataPoint.addField("testFieldKey", "testFieldValue");
+
+        assertEquals(expectedDataPoint, actualDataPoint);
+    }
+
+    @Test
+    void shouldParseNotDuplicatedLineToDataPoint() throws ParseException {
+        final String lineProtocol =
+                "test,testTagKey=testTagValue testFieldKey=\"testFieldValue\" 1556813561098000000";
+        final DataPoint dataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        final String notEqualLineProtocol =
+                "test,testTagKey=diff testFieldKey=\"testFieldValue\" 1556813561098000000";
+        final DataPoint notEqualDataPoint = InfluxParser.parseToDataPoint(notEqualLineProtocol);
+
+        assertNotEquals(dataPoint, notEqualDataPoint);
+
+        final String equalLineProtocol =
+                "test,testTagKey=testTagValue testFieldKey=\"testFieldValue\" 1556813561098000000";
+        final DataPoint equalDataPoint = InfluxParser.parseToDataPoint(equalLineProtocol);
+
+        assertEquals(equalDataPoint, dataPoint);
+    }
+
+    @Test
+    void shouldParseLineWithNoTimestamp() throws ParseException {
+        final String lineProtocol = "test,testTagKey=testTagValue testFieldKey=\"testFieldValue\"";
+        final DataPoint actualDataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        assert actualDataPoint != null;
+        assertNull(actualDataPoint.getTimestamp());
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"t", "T", "true", "True", "TRUE"})
+    void shouldParseLineWithFieldBoolAsTrueToPoint(final String fieldValue) throws ParseException {
+        final String lineProtocol =
+                String.format("test testFieldKey=%s 1556813561098000000", fieldValue);
+        final DataPoint actualDataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        assert actualDataPoint != null;
+        assertEquals(actualDataPoint.getField("testFieldKey"), true);
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"f", "F", "false", "False", "FALSE"})
+    void shouldParseLineWithFieldBoolAsFalseToPoint(final String fieldValue) throws ParseException {
+        final String lineProtocol =
+                String.format("test testFieldKey=%s 1556813561098000000", fieldValue);
+        final DataPoint actualDataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        assert actualDataPoint != null;
+        assertEquals(actualDataPoint.getField("testFieldKey"), false);
+    }
+
+    @Test
+    void shouldParseLineWithFieldValueAsFloatToDataPoint() throws ParseException {
+        final String lineProtocol = "test testFieldKey=-1.0 1556813561098000000";
+        final DataPoint actualDataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        assert actualDataPoint != null;
+        final double expectedFieldValue = -1.0;
+        assertEquals(expectedFieldValue, actualDataPoint.getField("testFieldKey"));
+    }
+
+    @Test
+    void shouldParseLineWithFieldValueAsIntegerToDataPoint() throws ParseException {
+        final String lineProtocol = "test testFieldKey=123456i 1556813561098000000";
+        final DataPoint actualDataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        assert actualDataPoint != null;
+        final Long expectedFieldValue = 123456L;
+        assertEquals(expectedFieldValue, actualDataPoint.getField("testFieldKey"));
+    }
+
+    @Test
+    void shouldNotParseLineWithFieldValueAsUnsignedIntegerToDataPoint() {
+        final String lineProtocol = "test testFieldKey=123u 1556813561098000000";
+
+        assertThrows(
+                ParseException.class,
+                () -> InfluxParser.parseToDataPoint(lineProtocol),
+                "Unable to parse line.");
+    }
+
+    @Test
+    void shouldParseLineWithFieldValueAsStringToDataPoint() throws ParseException {
+        final String lineProtocol = "test testFieldKey=\"testFieldValue\" 1556813561098000000";
+        final DataPoint actualDataPoint = InfluxParser.parseToDataPoint(lineProtocol);
+
+        assert actualDataPoint != null;
+        assertEquals("testFieldValue", actualDataPoint.getField("testFieldKey"));
+    }
+
+    @Test
+    void shouldNotParseLineWithNoMeasurement() {
+        final String lineProtocol =
+                "testTagKey=testTagValue testFieldKey=\"testFieldValue\" 1556813561098000000";
+
+        assertThrows(
+                ParseException.class,
+                () -> InfluxParser.parseToDataPoint(lineProtocol),
+                "Unable to parse line.");
+    }
+
+    @Test
+    void shouldNotParseLineWithNoFieldSet() {
+        final String lineProtocol = "test,testTagKey=123u 1556813561098000000";
+
+        assertThrows(
+                ParseException.class,
+                () -> InfluxParser.parseToDataPoint(lineProtocol),
+                "Unable to parse line.");
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
new file mode 100644
index 0000000..1d47bb5
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
+import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
+import org.junit.jupiter.api.Test;
+
+class InfluxDBSinkBuilderTest {
+
+    @Test
+    void shouldNotBuildSinkWhenURLIsNotProvided() {
+        final NullPointerException exception =
+                assertThrows(
+                        NullPointerException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                                        .setInfluxDBUsername(InfluxDBContainer.username)
+                                        .setInfluxDBPassword(InfluxDBContainer.password)
+                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .build());
+        assertEquals(exception.getMessage(), "The InfluxDB URL is required but not provided.");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenUsernameIsNotProvided() {
+        final NullPointerException exception =
+                assertThrows(
+                        NullPointerException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBUrl("http://localhost:8086")
+                                        .setInfluxDBPassword(InfluxDBContainer.password)
+                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                                        .build());
+        assertEquals(exception.getMessage(), "The InfluxDB username is required but not provided.");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenPasswordIsNotProvided() {
+        final NullPointerException exception =
+                assertThrows(
+                        NullPointerException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBUrl("http://localhost:8086")
+                                        .setInfluxDBUsername(InfluxDBContainer.username)
+                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                                        .build());
+        assertEquals(exception.getMessage(), "The InfluxDB password is required but not provided.");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenBucketIsNotProvided() {
+        final NullPointerException exception =
+                assertThrows(
+                        NullPointerException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBUrl("http://localhost:8086")
+                                        .setInfluxDBUsername(InfluxDBContainer.username)
+                                        .setInfluxDBPassword(InfluxDBContainer.password)
+                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                                        .build());
+        assertEquals(exception.getMessage(), "The Bucket name is required but not provided.");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenOrganizationIsNotProvided() {
+        final NullPointerException exception =
+                assertThrows(
+                        NullPointerException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBUrl("http://localhost:8086")
+                                        .setInfluxDBUsername(InfluxDBContainer.username)
+                                        .setInfluxDBPassword(InfluxDBContainer.password)
+                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
+                                        .build());
+        assertEquals(exception.getMessage(), "The Organization name is required but not provided.");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenSchemaSerializerIsNotProvided() {
+        final NullPointerException exception =
+                assertThrows(
+                        NullPointerException.class,
+                        () ->
+                                InfluxDBSink.builder()
+                                        .setInfluxDBUrl("http://localhost:8086")
+                                        .setInfluxDBUsername(InfluxDBContainer.username)
+                                        .setInfluxDBPassword(InfluxDBContainer.password)
+                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .build());
+        assertEquals(exception.getMessage(), "Serialization schema is required but not provided.");
+    }
+
+    @Test
+    void shouldNotBuildSinkWhenBufferSizeIsZero() {
+        final IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> InfluxDBSink.builder().setWriteBufferSize(0));
+        assertEquals(exception.getMessage(), "The buffer size should be greater than 0.");
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
new file mode 100644
index 0000000..4045b8c
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.source;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+class InfluxDBSourceBuilderTest {
+    @Test
+    void shouldNotBuildSourceWhenSchemaDeserializerIsNotProvided() {
+        final NullPointerException exception =
+                assertThrows(NullPointerException.class, () -> InfluxDBSource.builder().build());
+        assertEquals(
+                exception.getMessage(), "Deserialization schema is required but not provided.");
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
new file mode 100644
index 0000000..0f7347a
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
+        extends GenericContainer<SELF> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InfluxDBContainer.class);
+
+    public static final Integer INFLUXDB_PORT = 8086;
+
+    private static final String REGISTRY = "quay.io";
+    private static final String REPOSITORY = "influxdb/influxdb";
+    private static final String TAG = "v2.0.2";
+    private static final DockerImageName DEFAULT_IMAGE_NAME =
+            DockerImageName.parse(String.format("%s/%s:%s", REGISTRY, REPOSITORY, TAG));
+    private static final int NO_CONTENT_STATUS_CODE = 204;
+    private static final String INFLUX_SETUP_SH = "influx-setup.sh";
+
+    public static final String username = "test-user";
+    public static final String password = "test-password";
+    public static final String bucket = "test-bucket";
+    public static final String organization = "test-org";
+    private static final int retention = 0;
+    private static final String retentionUnit = "ns";
+
+    private InfluxDBContainer(final DockerImageName imageName) {
+        super(imageName);
+        imageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+        this.setEnv();
+        this.waitStrategy =
+                (new WaitAllStrategy())
+                        .withStrategy(
+                                Wait.forHttp("/ping")
+                                        .withBasicCredentials(username, password)
+                                        .forStatusCode(NO_CONTENT_STATUS_CODE))
+                        .withStrategy(Wait.forListeningPort());
+
+        this.addExposedPort(INFLUXDB_PORT);
+        this.startContainer();
+    }
+
+    public static InfluxDBContainer<?> createWithDefaultTag() {
+        LOG.info("Starting influxDB test container with default tag {}", DEFAULT_IMAGE_NAME);
+        return new InfluxDBContainer<>(DEFAULT_IMAGE_NAME);
+    }
+
+    private void setEnv() {
+        this.addEnv("INFLUXDB_USER", username);
+        this.addEnv("INFLUXDB_PASSWORD", password);
+        this.addEnv("INFLUXDB_BUCKET", bucket);
+        this.addEnv("INFLUXDB_ORG", organization);
+        this.addEnv("INFLUXDB_RETENTION", String.valueOf(retention));
+        this.addEnv("INFLUXDB_RETENTION_UNIT", retentionUnit);
+    }
+
+    private void startContainer() {
+        this.withCopyFileToContainer(
+                MountableFile.forClasspathResource(INFLUX_SETUP_SH),
+                String.format("%s", INFLUX_SETUP_SH));
+        this.start();
+        this.setUpInfluxDB();
+        LOG.info("Started InfluxDB container on: {}", this.getUrl());
+    }
+
+    private void setUpInfluxDB() {
+        final ExecResult execResult;
+        try {
+            execResult = this.execInContainer("chmod", "-x", String.format("/%s", INFLUX_SETUP_SH));
+            assertEquals(execResult.getExitCode(), 0);
+            final ExecResult writeResult =
+                    this.execInContainer("/bin/bash", String.format("/%s", INFLUX_SETUP_SH));
+            assertEquals(writeResult.getExitCode(), 0);
+        } catch (final InterruptedException | IOException e) {
+            LOG.error("An error occurred while setting up InfluxDB {}", e.getMessage());
+        }
+    }
+
+    public String getUrl() {
+        return "http://" + this.getHost() + ":" + this.getMappedPort(INFLUXDB_PORT);
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java
new file mode 100644
index 0000000..d231d89
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.util;
+
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
+
+public class InfluxDBTestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return dataPoint.getField("longValue");
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
new file mode 100644
index 0000000..c48ed98
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.util;
+
+import com.influxdb.client.write.Point;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+import org.jetbrains.annotations.Nullable;
+
+public class InfluxDBTestSerializer implements InfluxDBSchemaSerializer<Long> {
+
+    @Override
+    public Point serialize(final Long element, @Nullable final Context context) {
+        final Point dataPoint = new Point("test");
+        dataPoint.addTag("longValue", String.valueOf(element));
+        dataPoint.addField("fieldKey", "fieldValue");
+        return dataPoint;
+    }
+}
diff --git a/flink-connector-influxdb2/src/test/resources/influx-setup.sh b/flink-connector-influxdb2/src/test/resources/influx-setup.sh
new file mode 100644
index 0000000..6372cb2
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/resources/influx-setup.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+influx setup \
+       --force \
+       --username "${INFLUXDB_USER}" \
+       --password "${INFLUXDB_PASSWORD}" \
+       --bucket "${INFLUXDB_BUCKET}" \
+       --org "${INFLUXDB_ORG}" \
+       --retention "${INFLUXDB_RETENTION}""${INFLUXDB_RETENTION_UNIT}"
diff --git a/flink-connector-influxdb2/src/test/resources/log4j2-test.properties b/flink-connector-influxdb2/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..651441b
--- /dev/null
+++ b/flink-connector-influxdb2/src/test/resources/log4j2-test.properties
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+status=warn
+appender.console.type=Console
+appender.console.name=LogToConsole
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} - %highlight{%5p} %style{%logger{36}}{cyan} - %m%n%throwable
+### Logger test containers ###
+logger.testContainers.name=org.testcontainers
+logger.testContainers.level=WARN
+logger.testContainers.additivity=false
+logger.testContainers.appenderRef.console.ref=LogToConsole
+### Logger Docker Java ###
+logger.dockerJava.name=com.github.dockerjava
+logger.dockerJava.level=WARN
+logger.dockerJava.additivity=false
+logger.dockerJava.appenderRef.console.ref=LogToConsole
+### Logger Apache Flink ###
+logger.apacheFlink.name=org.apache.flink
+logger.apacheFlink.level=WARN
+logger.apacheFlink.additivity=false
+logger.apacheFlink.appenderRef.console.ref=LogToConsole
+### Logger Apache Streaming Connectors ###
+logger.streamingConnectors.name=org.apache.flink.streaming.connectors
+logger.streamingConnectors.level=INFO
+logger.streamingConnectors.additivity=false
+logger.streamingConnectors.appenderRef.console.ref=LogToConsole
+# Root Logger
+rootLogger.level=OFF
diff --git a/pom.xml b/pom.xml
index 57eb579..26a8d98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-influxdb</module>
+    <module>flink-connector-influxdb2</module>
     <module>flink-connector-kudu</module>
     <module>flink-connector-netty</module>
     <module>flink-connector-pinot</module>