You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2023/05/09 13:57:00 UTC

[bahir-flink] 02/05: [BAHIR-308] Bump flink version to 1.15.3

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit 06fe56a73540c100f8699d2af29e396330452d95
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Wed Mar 8 17:11:35 2023 +0100

    [BAHIR-308] Bump flink version to 1.15.3
---
 .github/workflows/maven-ci.yml                     |  27 +-
 flink-connector-activemq/pom.xml                   |   6 +-
 flink-connector-akka/pom.xml                       |   2 +-
 flink-connector-flume/pom.xml                      |   4 +-
 flink-connector-influxdb/pom.xml                   |   4 +-
 flink-connector-influxdb2/pom.xml                  |   4 +-
 .../connectors/influxdb/common/DataPoint.java      |   3 +-
 .../connectors/influxdb/common/InfluxParser.java   |   9 +-
 .../connectors/influxdb/sink/InfluxDBSink.java     |   5 +-
 .../influxdb/sink/InfluxDBSinkBuilder.java         |  15 +-
 .../commiter/InfluxDBCommittableSerializer.java    |   3 +-
 .../influxdb/sink/commiter/InfluxDBCommitter.java  |  15 +-
 .../sink/writer/InfluxDBPointSerializer.java       |   7 +-
 .../sink/writer/InfluxDBSchemaSerializer.java      |   3 +-
 .../influxdb/sink/writer/InfluxDBWriter.java       |  17 +-
 .../connectors/influxdb/sink2/InfluxDBSink.java    |  62 ++++
 .../{sink => sink2}/InfluxDBSinkBuilder.java       |  27 +-
 .../influxdb/sink2/InfluxDBSinkOptions.java        | 101 ++++++
 .../writer/InfluxDBSchemaSerializer.java           |   5 +-
 .../{sink => sink2}/writer/InfluxDBWriter.java     | 102 +++---
 .../connectors/influxdb/source/InfluxDBSource.java |  10 +-
 .../influxdb/source/InfluxDBSourceBuilder.java     |   4 +-
 .../source/enumerator/InfluxDBSplitEnumerator.java |   7 +-
 .../connectors/influxdb/source/http/Handler.java   |   5 +-
 .../influxdb/source/http/HealthCheckHandler.java   |   3 +-
 .../influxdb/source/http/WriteAPIHandler.java      |  13 +-
 .../source/reader/InfluxDBRecordEmitter.java       |   3 +-
 .../source/reader/InfluxDBSourceReader.java        |   5 +-
 .../source/reader/InfluxDBSplitReader.java         |  22 +-
 .../InfluxDBDataPointDeserializer.java             |   5 +-
 .../source/split/InfluxDBSplitSerializer.java      |   3 +-
 .../influxdb/common/InfluxParserTest.java          |  10 +-
 .../influxdb/sink/InfluxDBSinkBuilderTest.java     |  59 ++--
 .../InfluxDBSinkIntegrationTestCase.java           |  49 +--
 .../writer}/InfluxDBTestSerializer.java            |   3 +-
 .../InfluxDBSinkIntegrationTestCase.java           |  54 +--
 .../writer}/InfluxDBTestSerializer.java            |   5 +-
 .../influxdb/source/InfluxDBSourceBuilderTest.java |   4 +-
 .../InfluxDBSourceIntegrationTestCase.java         |  39 +--
 .../reader}/InfluxDBTestDeserializer.java          |   2 +-
 .../influxdb/util/InfluxDBContainer.java           |  76 +----
 ...Container.java => InfluxDBContainerCustom.java} |  49 +--
 flink-connector-kudu/pom.xml                       |  16 +-
 .../kudu/connector/ColumnSchemasFactory.java       |   1 -
 .../kudu/connector/CreateTableOptionsFactory.java  |   1 -
 .../connectors/kudu/connector/KuduFilterInfo.java  |   1 -
 .../connectors/kudu/connector/KuduTableInfo.java   |   3 +-
 .../convertor/RowResultRowDataConvertor.java       |   6 +-
 .../kudu/connector/failure/KuduFailureHandler.java |   1 -
 .../kudu/connector/reader/KuduReader.java          |  10 +-
 .../kudu/connector/reader/KuduReaderConfig.java    |   3 +-
 .../writer/AbstractSingleOperationMapper.java      |   1 -
 .../kudu/connector/writer/KuduOperationMapper.java |   1 -
 .../kudu/connector/writer/KuduWriter.java          |   9 +-
 .../kudu/connector/writer/KuduWriterConfig.java    |   3 +-
 .../kudu/connector/writer/PojoOperationMapper.java |   6 +-
 .../connectors/kudu/format/KuduOutputFormat.java   |   1 -
 .../flink/connectors/kudu/streaming/KuduSink.java  |   1 -
 .../kudu/table/AbstractReadOnlyCatalog.java        |  22 +-
 .../flink/connectors/kudu/table/KuduCatalog.java   |  35 +-
 .../connectors/kudu/table/KuduTableFactory.java    |  28 +-
 .../connectors/kudu/table/KuduTableSource.java     |  13 +-
 .../kudu/table/UpsertOperationMapper.java          |   1 -
 .../kudu/table/dynamic/KuduDynamicTableSource.java |  40 ++-
 .../table/dynamic/catalog/KuduDynamicCatalog.java  |  32 +-
 .../kudu/table/utils/KuduTableUtils.java           |  12 +-
 .../connectors/kudu/table/utils/KuduTypeUtils.java |  14 +-
 .../connectors/kudu/connector/KuduTestBase.java    |  19 +-
 .../kudu/format/KuduOutputFormatTest.java          |   3 +-
 .../connectors/kudu/streaming/KuduSinkTest.java    |   1 -
 .../connectors/kudu/table/KuduCatalogTest.java     |   1 -
 .../kudu/table/KuduTableFactoryTest.java           |  11 +-
 .../kudu/table/KuduTableSourceITCase.java          |   4 +-
 .../connectors/kudu/table/KuduTableSourceTest.java |  18 +-
 .../connectors/kudu/table/KuduTableTestUtils.java  |   4 +-
 .../kudu/writer/AbstractOperationTest.java         |   8 +-
 .../kudu/writer/PojoOperationMapperTest.java       |   3 +-
 .../kudu/writer/RowOperationMapperTest.java        |   1 -
 .../kudu/writer/TupleOpertaionMapperTest.java      |   1 -
 flink-connector-netty/pom.xml                      |   2 +-
 .../netty/example/StreamSqlExample.scala           |  11 +-
 flink-connector-pinot/pom.xml                      |  14 +-
 .../streaming/connectors/pinot/PinotSink.java      | 376 ---------------------
 .../PinotSinkGlobalCommittableSerializer.java      |  83 -----
 .../streaming/connectors/pinot/v2/PinotSink.java   | 128 +++++++
 .../connectors/pinot/v2/PinotSinkBuilder.java      | 165 +++++++++
 .../{ => v2}/committer/PinotSinkCommittable.java   |   4 +-
 .../committer}/PinotSinkCommittableSerializer.java |   3 +-
 .../committer/PinotSinkCommitter.java}             | 244 +++++++------
 .../external/EventTimeExtractor.java}              |  54 +--
 .../external/JsonSerializer.java}                  |  33 +-
 .../writer/PinotWriter.java}                       | 163 +++++----
 .../pinot/{ => v2}/writer/PinotWriterSegment.java  |  10 +-
 .../writer/PinotWriterState.java}                  |   6 +-
 .../writer/PinotWriterStateSerializer.java}        |  15 +-
 .../connectors/pinot/PinotClusterContainer.java    | 131 +++++++
 .../connectors/pinot/PinotTestHelper.java          |   6 +-
 .../connectors/pinot/{ => v2}/PinotSinkTest.java   |  50 ++-
 .../connectors/pinot/{ => v2}/PinotTestBase.java   |  65 ++--
 .../src/test/resources/log4j.properties            |  13 +
 .../src/test/resources/log4j2-test.properties      |  46 +++
 .../pinot-broker.conf}                             |  12 +-
 .../pinot-controller.conf}                         |  16 +-
 .../pinot-server.conf}                             |  15 +-
 flink-connector-redis/pom.xml                      |   8 +-
 .../connectors/redis/RedisITCaseBase.java          |   2 +-
 flink-library-siddhi/pom.xml                       |   4 +-
 pom.xml                                            |  14 +-
 108 files changed, 1383 insertions(+), 1496 deletions(-)

diff --git a/.github/workflows/maven-ci.yml b/.github/workflows/maven-ci.yml
index 2b41574..ef344b9 100644
--- a/.github/workflows/maven-ci.yml
+++ b/.github/workflows/maven-ci.yml
@@ -28,21 +28,30 @@ jobs:
 
     runs-on: ubuntu-latest
     strategy:
+      fail-fast: false
       matrix:
        java: ['8', '11']
-       flink-version: ['1.14.6']
-       scala-version: ['2.12']
+       flink-version: ['1.15.3']
+       connector: [
+         'flink-connector-activemq',
+         'flink-connector-akka',
+         'flink-connector-flume',
+         'flink-connector-influxdb',
+         'flink-connector-influxdb2',
+         'flink-connector-kudu',
+         'flink-connector-netty',
+         'flink-connector-pinot',
+         'flink-connector-redis',
+         'flink-library-siddhi'
+       ]
 
     steps:
-    - uses: actions/checkout@v2
+    - uses: actions/checkout@v3
     - name: Set up JDK ${{ matrix.java }}
-      uses: actions/setup-java@v2
+      uses: actions/setup-java@v3
       with:
         java-version: ${{ matrix.java }}
-        distribution: 'adopt'
+        distribution: 'zulu'
         cache: maven 
-    - name: Change scala version to ${{ matrix.scala-version }}
-      run: ./dev/change-scala-version.sh ${{ matrix.scala-version }}
-      shell: bash
     - name: Build with flink ${{ matrix.flink-version }}
-      run: mvn -B clean verify -Dscala-${{ matrix.scala-version }} -Dflink.version=${{ matrix.flink-version }}
+      run: mvn -B clean verify -pl ${{ matrix.connector }} -am -Dscala-2.12 -Dflink.version=${{ matrix.flink-version }}
diff --git a/flink-connector-activemq/pom.xml b/flink-connector-activemq/pom.xml
index f70e158..cde65b6 100644
--- a/flink-connector-activemq/pom.xml
+++ b/flink-connector-activemq/pom.xml
@@ -92,16 +92,16 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
             <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <artifactId>flink-test-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/flink-connector-akka/pom.xml b/flink-connector-akka/pom.xml
index b929dce..53da588 100644
--- a/flink-connector-akka/pom.xml
+++ b/flink-connector-akka/pom.xml
@@ -63,7 +63,7 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index e900aac..46f3b1a 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -51,11 +51,11 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_${scala.binary.version}</artifactId>
+			<artifactId>flink-clients</artifactId>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<artifactId>flink-streaming-java</artifactId>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
diff --git a/flink-connector-influxdb/pom.xml b/flink-connector-influxdb/pom.xml
index c272e85..1108cfa 100644
--- a/flink-connector-influxdb/pom.xml
+++ b/flink-connector-influxdb/pom.xml
@@ -49,11 +49,11 @@ under the License.
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <artifactId>flink-test-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>org.influxdb</groupId>
diff --git a/flink-connector-influxdb2/pom.xml b/flink-connector-influxdb2/pom.xml
index 74c8fd1..48f26df 100644
--- a/flink-connector-influxdb2/pom.xml
+++ b/flink-connector-influxdb2/pom.xml
@@ -93,12 +93,12 @@ under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <artifactId>flink-streaming-java</artifactId>
       <type>test-jar</type>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+      <artifactId>flink-test-utils</artifactId>
     </dependency>
 
     <dependency>
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
index 5e3f86a..966c5f0 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/DataPoint.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.connectors.influxdb.common;
 import com.influxdb.Arguments;
 import com.influxdb.client.domain.WritePrecision;
 import com.influxdb.client.write.Point;
+
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import javax.annotation.Nullable;
 
 /**
  * InfluxDB data point class.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
index fd91046..0051806 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParser.java
@@ -17,10 +17,6 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.common;
 
-import java.text.ParseException;
-import java.util.List;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
 import org.antlr.v4.runtime.ANTLRInputStream;
 import org.antlr.v4.runtime.CharStream;
 import org.antlr.v4.runtime.CommonTokenStream;
@@ -30,6 +26,11 @@ import org.apache.druid.data.input.influx.InfluxLineProtocolParser;
 import org.apache.druid.data.input.influx.InfluxLineProtocolParser.TimestampContext;
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
+import java.text.ParseException;
+import java.util.List;
+import java.util.regex.Pattern;
+
 /**
  * This is an InfluxDB line protocol parser.
  *
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
index 1b6df26..705e5a0 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSink.java
@@ -18,8 +18,6 @@
 package org.apache.flink.streaming.connectors.influxdb.sink;
 
 import com.influxdb.client.write.Point;
-import java.util.List;
-import java.util.Optional;
 import org.apache.flink.api.connector.sink.Committer;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.api.connector.sink.Sink;
@@ -32,6 +30,9 @@ import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBPointS
 import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
 import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
 
+import java.util.List;
+import java.util.Optional;
+
 /**
  * This Sink implementation of InfluxDB/Line Protocol. Please use a {@link InfluxDBSinkBuilder} to
  * construct a {@link InfluxDBSink}. The following example shows how to create an InfluxDBSink
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
index 6b9fb5e..e1a0105 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
@@ -17,21 +17,14 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.sink;
 
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
 import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
 
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.*;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The @builder class for {@link InfluxDBSink} to make it easier for the users to construct a {@link
  * InfluxDBSink}.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
index 9f11662..44673ec 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommittableSerializer.java
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
 
-import java.nio.ByteBuffer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
+import java.nio.ByteBuffer;
+
 /**
  * This class Serialize and deserializes the commit values. Since we are sending the timestamp value
  * as a committable the Long object is (de)serialized.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
index 3872eed..2ef038d 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/commiter/InfluxDBCommitter.java
@@ -17,22 +17,23 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.sink.commiter;
 
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.WriteApi;
 import com.influxdb.client.domain.WritePrecision;
 import com.influxdb.client.write.Point;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink.Committer;
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+
 /**
  * The InfluxDBCommitter implements the {@link Committer} interface The InfluxDBCommitter is called
  * whenever a checkpoint is set by Flink. When this class is called it writes a checkpoint data
@@ -89,7 +90,7 @@ public final class InfluxDBCommitter implements Committer<Long> {
         try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
             final Point point = new Point("checkpoint");
             point.addField("checkpoint", "flink");
-            timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.NS));
+            timestamp.ifPresent(aTime -> point.time(aTime, WritePrecision.MS));
             writeApi.writePoint(point);
             LOG.debug("Checkpoint data point write at {}", point.toLineProtocol());
         }
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
index 4be9b7f..5ca1590 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBPointSerializer.java
@@ -18,15 +18,16 @@
 package org.apache.flink.streaming.connectors.influxdb.sink.writer;
 
 import com.influxdb.client.write.Point;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
 
 @Internal
 public final class InfluxDBPointSerializer implements SimpleVersionedSerializer<Point> {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
index dff4a65..3871590 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
@@ -18,9 +18,10 @@
 package org.apache.flink.streaming.connectors.influxdb.sink.writer;
 
 import com.influxdb.client.write.Point;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.flink.api.connector.sink.SinkWriter.Context;
 
 public interface InfluxDBSchemaSerializer<IN> extends Serializable {
 
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
index 5826c5c..bf3778b 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
@@ -17,17 +17,9 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.sink.writer;
 
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.WriteApi;
 import com.influxdb.client.write.Point;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
 import org.apache.flink.api.connector.sink.SinkWriter;
@@ -35,6 +27,13 @@ import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.*;
+
 /**
  * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
  * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
@@ -78,7 +77,6 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
      * @param context current Flink context
      * @see org.apache.flink.api.connector.sink.SinkWriter.Context
      */
-    @Override
     public void write(final IN in, final Context context) throws IOException {
         LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
         this.elements.add(this.schemaSerializer.serialize(in, context));
@@ -87,6 +85,7 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
             this.writeCurrentElements();
             this.elements.clear();
         }
+
         if (context.timestamp() != null) {
             this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
         }
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSink.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSink.java
new file mode 100644
index 0000000..9a46d05
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSink.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBWriter;
+
+import java.io.IOException;
+
+public class InfluxDBSink<IN> implements Sink<IN> {
+
+    private final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
+    private final Configuration configuration;
+
+    InfluxDBSink(
+            final InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer,
+            final Configuration configuration) {
+        this.influxDBSchemaSerializer = influxDBSchemaSerializer;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Get a influxDBSinkBuilder to build a {@link InfluxDBSink}.
+     *
+     * @return a InfluxDB sink builder.
+     */
+    public static <IN> InfluxDBSinkBuilder<IN> builder() {
+        return new InfluxDBSinkBuilder<>();
+    }
+
+
+    @Override
+    public SinkWriter<IN> createWriter(InitContext initContext) throws IOException {
+        final InfluxDBWriter<IN> writer =
+                new InfluxDBWriter<>(this.influxDBSchemaSerializer, this.configuration);
+        writer.setProcessingTimerService(initContext.getProcessingTimeService());
+        return writer;
+    }
+
+    public Configuration getConfiguration() {
+        return this.configuration;
+    }
+
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkBuilder.java
similarity index 87%
copy from flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
copy to flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkBuilder.java
index 6b9fb5e..e2e97f6 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkBuilder.java
@@ -15,26 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb.sink;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_TOKEN;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+package org.apache.flink.streaming.connectors.influxdb.sink2;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBWriter;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSinkOptions.*;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The @builder class for {@link InfluxDBSink} to make it easier for the users to construct a {@link
- * InfluxDBSink}.
+ * The @builder class for {@link org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSink} to make it easier for the users to construct a {@link
+ * org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSink}.
  *
  * <p>The following example shows the minimum setup to create a InfluxDBSink that uses the Long
  * values from a former operator and sends it to an InfluxDB instance.
@@ -191,7 +184,7 @@ public final class InfluxDBSinkBuilder<IN> {
     }
 
     /**
-     * Build the {@link InfluxDBSink}.
+     * Build the {@link org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSink}.
      *
      * @return a InfluxDBSink with the settings made for this builder.
      */
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkOptions.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkOptions.java
new file mode 100644
index 0000000..7508434
--- /dev/null
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.influxdb.sink2;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.InfluxDBClientOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+public final class InfluxDBSinkOptions {
+
+    private InfluxDBSinkOptions() {}
+
+    public static final ConfigOption<Boolean> WRITE_DATA_POINT_CHECKPOINT =
+            ConfigOptions.key("sink.influxDB.write.data_point.checkpoint")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Determines if the checkpoint data point should be written to InfluxDB or not.");
+
+    public static final ConfigOption<Integer> WRITE_BUFFER_SIZE =
+            ConfigOptions.key("sink.influxDB.write.buffer.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Size of the buffer to store the data before writing to InfluxDB.");
+
+    public static final ConfigOption<String> INFLUXDB_URL =
+            ConfigOptions.key("sink.influxDB.client.URL")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB Connection URL.");
+
+    public static final ConfigOption<String> INFLUXDB_USERNAME =
+            ConfigOptions.key("sink.influxDB.client.username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB username.");
+
+    public static final ConfigOption<String> INFLUXDB_PASSWORD =
+            ConfigOptions.key("sink.influxDB.client.password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB password.");
+
+    public static final ConfigOption<String> INFLUXDB_TOKEN =
+            ConfigOptions.key("sink.influxDB.client.token")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB access token.");
+
+    public static final ConfigOption<String> INFLUXDB_BUCKET =
+            ConfigOptions.key("sink.influxDB.client.bucket")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB bucket name.");
+
+    public static final ConfigOption<String> INFLUXDB_ORGANIZATION =
+            ConfigOptions.key("sink.influxDB.client.organization")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("InfluxDB organization name.");
+
+    public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
+        final String url = configuration.getString(INFLUXDB_URL);
+        final String username = configuration.getString(INFLUXDB_USERNAME);
+        final String password = configuration.getString(INFLUXDB_PASSWORD);
+        final String token = configuration.getString(INFLUXDB_TOKEN);
+        final String bucket = configuration.getString(INFLUXDB_BUCKET);
+        final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
+        InfluxDBClientOptions.Builder builder = InfluxDBClientOptions.builder();
+        builder = builder
+                .url(url)
+                .bucket(bucket)
+                .org(organization);
+        if (token != null) {
+            builder = builder.authenticateToken(token.toCharArray());
+        } else if (username != null && password != null) {
+            builder = builder.authenticate(username, password.toCharArray());
+        }
+        final InfluxDBClientOptions influxDBClientOptions = builder.build();
+        return InfluxDBClientFactory.create(influxDBClientOptions);
+    }
+}
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBSchemaSerializer.java
similarity index 90%
copy from flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
copy to flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBSchemaSerializer.java
index dff4a65..d8f1a69 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBSchemaSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBSchemaSerializer.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb.sink.writer;
+package org.apache.flink.streaming.connectors.influxdb.sink2.writer;
 
 import com.influxdb.client.write.Point;
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
+
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.flink.api.connector.sink.SinkWriter.Context;
 
 public interface InfluxDBSchemaSerializer<IN> extends Serializable {
 
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBWriter.java
similarity index 63%
copy from flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
copy to flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBWriter.java
index 5826c5c..10797ec 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBWriter.java
@@ -15,39 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb.sink.writer;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+package org.apache.flink.streaming.connectors.influxdb.sink2.writer;
 
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
 import com.influxdb.client.write.Point;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
 
-/**
- * This Class implements the {@link SinkWriter} and it is responsible to write incoming inputs to
- * InfluxDB. It uses the {@link InfluxDBSchemaSerializer} to serialize the input into a {@link
- * Point} object. Each serialized object is stored in the {@link #elements} list. Whenever the size
- * of the list reaches the {@link #bufferSize}, the influxDB write API is called and all the items
- * all written. The {@link #lastTimestamp} keeps track of the biggest timestamp of the incoming
- * elements.
- *
- * @param <IN> Type of the input
- * @see WriteApi
- */
-@Internal
-public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
+import static org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSinkOptions.*;
+
+public class InfluxDBWriter<IN> implements SinkWriter<IN> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InfluxDBWriter.class);
 
@@ -69,6 +57,10 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
         this.influxDBClient = getInfluxDBClient(configuration);
     }
 
+    public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
+        this.processingTimerService = processingTimerService;
+    }
+
     /**
      * This method calls the InfluxDB write API whenever the element list reaches the {@link
      * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest
@@ -76,60 +68,72 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> {
      *
      * @param in incoming data
      * @param context current Flink context
-     * @see org.apache.flink.api.connector.sink.SinkWriter.Context
+     * @see org.apache.flink.api.connector.sink2.SinkWriter.Context
      */
     @Override
-    public void write(final IN in, final Context context) throws IOException {
+    public void write(IN in, Context context) throws IOException, InterruptedException {
         LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
         this.elements.add(this.schemaSerializer.serialize(in, context));
+
         if (this.elements.size() == this.bufferSize) {
             LOG.debug("Buffer size reached preparing to write the elements.");
             this.writeCurrentElements();
-            this.elements.clear();
         }
         if (context.timestamp() != null) {
             this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp());
         }
+
     }
 
-    /**
-     * This method is called whenever a checkpoint is set by Flink. It creates a list and fills it
-     * up with the latest timestamp.
-     *
-     * @param flush
-     * @return A list containing 0 or 1 element
-     */
     @Override
-    public List<Long> prepareCommit(final boolean flush) {
-        if (this.lastTimestamp == 0) {
-            return Collections.emptyList();
+    public void flush(boolean flush) throws IOException, InterruptedException {
+        if (this.lastTimestamp == 0) return;
+
+        commit(Collections.singletonList(this.lastTimestamp));
+    }
+
+    public void commit(final List<Long> committables) {
+        if (this.writeCheckpoint) {
+            LOG.debug("A checkpoint is set.");
+            Optional<Long> lastTimestamp = Optional.empty();
+            if (committables.size() >= 1) {
+                lastTimestamp = Optional.ofNullable(committables.get(committables.size() - 1));
+            }
+            lastTimestamp.ifPresent(this::writeCheckpointDataPoint);
         }
-        final List<Long> lastTimestamp = new ArrayList<>(1);
-        lastTimestamp.add(this.lastTimestamp);
-        return lastTimestamp;
     }
 
-    @Override
-    public List<Point> snapshotState() {
-        return this.elements;
+    private void writeCheckpointDataPoint(final Long timestamp) {
+        final Point point = new Point("checkpoint")
+                .addField("checkpoint", "flink")
+                .time(timestamp, WritePrecision.MS);
+
+        writeElementsOf(Collections.singletonList(point));
+        LOG.debug("Checkpoint data point write at {}", point.toLineProtocol());
     }
 
     @Override
     public void close() throws Exception {
         LOG.debug("Preparing to write the elements in InfluxDB.");
         this.writeCurrentElements();
+
         LOG.debug("Closing the writer.");
-        this.elements.clear();
+        this.influxDBClient.close();
     }
 
-    public void setProcessingTimerService(final ProcessingTimeService processingTimerService) {
-        this.processingTimerService = processingTimerService;
+    private void writeCurrentElements() {
+        writeElementsOf(this.elements);
+        this.elements.clear();
     }
 
-    private void writeCurrentElements() {
+    private void writeElementsOf(List<Point> toWrite) {
+        if (toWrite.isEmpty()) return;
+
         try (final WriteApi writeApi = this.influxDBClient.getWriteApi()) {
-            writeApi.writePoints(this.elements);
-            LOG.debug("Wrote {} data points", this.elements.size());
+            writeApi.writePoints(toWrite);
+            LOG.debug("Wrote {} data points", toWrite.size());
+        } catch (Exception e) {
+            e.printStackTrace();
         }
     }
 }
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
index fb55eef..7c17035 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSource.java
@@ -17,14 +17,8 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source;
 
-import java.util.function.Supplier;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.*;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -38,6 +32,8 @@ import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer
 import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
 import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplitSerializer;
 
+import java.util.function.Supplier;
+
 /**
  * The Source implementation of InfluxDB. Please use a {@link InfluxDBSourceBuilder} to construct a
  * {@link InfluxDBSource}. The following example shows how to create an InfluxDBSource emitting
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
index 4ff9c5c..9b05307 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilder.java
@@ -17,11 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The @builder class for {@link InfluxDBSource} to make it easier for the users to construct a
  * {@link InfluxDBSource}.
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
index 9ac9b23..54373c9 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/enumerator/InfluxDBSplitEnumerator.java
@@ -17,15 +17,16 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source.enumerator;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.util.List;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** The enumerator class for InfluxDB source. */
 @Internal
 public final class InfluxDBSplitEnumerator
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
index e71b684..12d1431 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/Handler.java
@@ -19,11 +19,12 @@ package org.apache.flink.streaming.connectors.influxdb.source.http;
 
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
-import java.io.IOException;
-import java.io.OutputStream;
 import org.apache.flink.annotation.Internal;
 import org.jetbrains.annotations.NotNull;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 /** Abstract base handle class for creating a response */
 @Internal
 abstract class Handler implements HttpHandler {
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
index ec4d45d..7daee00 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/HealthCheckHandler.java
@@ -18,9 +18,10 @@
 package org.apache.flink.streaming.connectors.influxdb.source.http;
 
 import com.sun.net.httpserver.HttpExchange;
+import org.apache.flink.annotation.Internal;
+
 import java.io.IOException;
 import java.net.HttpURLConnection;
-import org.apache.flink.annotation.Internal;
 
 /**
  * Handles incoming health check requests from /health path. If the server is running a response
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
index 1c2bba0..c9752ed 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/http/WriteAPIHandler.java
@@ -18,6 +18,13 @@
 package org.apache.flink.streaming.connectors.influxdb.source.http;
 
 import com.sun.net.httpserver.HttpExchange;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
+import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -30,12 +37,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
-import org.apache.flink.streaming.connectors.influxdb.common.InfluxParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class handles the incoming requests through the path /api/v2/write. The handle function
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
index 1612bc5..08e98d5 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBRecordEmitter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source.reader;
 
-import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -25,6 +24,8 @@ import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
 import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
 import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
 
+import java.io.IOException;
+
 @Internal
 public final class InfluxDBRecordEmitter<T> implements RecordEmitter<DataPoint, T, InfluxDBSplit> {
     private final InfluxDBDataPointDeserializer<T> dataPointDeserializer;
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
index 1b75b84..d1bf098 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSourceReader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source.reader;
 
-import java.util.Map;
-import java.util.function.Supplier;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
@@ -27,6 +25,9 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
 import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
 import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
 
+import java.util.Map;
+import java.util.function.Supplier;
+
 /** The source reader for the InfluxDB line protocol. */
 @Internal
 public final class InfluxDBSourceReader<OUT>
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
index ddfcb33..7b19d6c 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBSplitReader.java
@@ -17,21 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source.reader;
 
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.ENQUEUE_WAIT_TIME;
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.INGEST_QUEUE_CAPACITY;
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.MAXIMUM_LINES_PER_REQUEST;
-import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.PORT;
-
 import com.sun.net.httpserver.HttpServer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -43,6 +29,14 @@ import org.apache.flink.streaming.connectors.influxdb.source.http.HealthCheckHan
 import org.apache.flink.streaming.connectors.influxdb.source.http.WriteAPIHandler;
 import org.apache.flink.streaming.connectors.influxdb.source.split.InfluxDBSplit;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSourceOptions.*;
+
 /**
  * A {@link SplitReader} implementation that reads records from InfluxDB splits.
  *
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
index 0773b2a..9dc9658 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/reader/deserializer/InfluxDBDataPointDeserializer.java
@@ -17,13 +17,14 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer;
 
-import java.io.IOException;
-import java.io.Serializable;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 /** An interface for the deserialization of InfluxDB data points. */
 public interface InfluxDBDataPointDeserializer<OUT> extends Serializable, ResultTypeQueryable<OUT> {
 
diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
index 4d1da83..624a27f 100644
--- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
+++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/source/split/InfluxDBSplitSerializer.java
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source.split;
 
-import java.nio.ByteBuffer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
+import java.nio.ByteBuffer;
+
 /**
  * The {@link org.apache.flink.core.io.SimpleVersionedSerializer serializer} for {@link
  * InfluxDBSplit}.
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
index 71d9529..172d77f 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/common/InfluxParserTest.java
@@ -17,16 +17,14 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.common;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.text.ParseException;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.text.ParseException;
+
+import static org.junit.jupiter.api.Assertions.*;
+
 class InfluxParserTest {
 
     @Test
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
index f8dc31c..1138aa4 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkBuilderTest.java
@@ -17,12 +17,13 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.sink;
 
-import static org.junit.jupiter.api.Assertions.*;
-
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBTestSerializer;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
 class InfluxDBSinkBuilderTest {
 
     @Test
@@ -33,10 +34,10 @@ class InfluxDBSinkBuilderTest {
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
-                                        .setInfluxDBUsername(InfluxDBContainer.username)
-                                        .setInfluxDBPassword(InfluxDBContainer.password)
-                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
-                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBUsername("username")
+                                        .setInfluxDBPassword("password")
+                                        .setInfluxDBBucket("bucket")
+                                        .setInfluxDBOrganization("organization")
                                         .build());
         assertEquals(exception.getMessage(), "The InfluxDB URL is required but not provided.");
     }
@@ -49,9 +50,9 @@ class InfluxDBSinkBuilderTest {
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
-                                        .setInfluxDBPassword(InfluxDBContainer.password)
-                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
-                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBPassword("password")
+                                        .setInfluxDBBucket("bucket")
+                                        .setInfluxDBOrganization("organization")
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
         assertEquals(exception.getMessage(),
@@ -66,9 +67,9 @@ class InfluxDBSinkBuilderTest {
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
-                                        .setInfluxDBUsername(InfluxDBContainer.username)
-                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
-                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBUsername("username")
+                                        .setInfluxDBBucket("bucket")
+                                        .setInfluxDBOrganization("organization")
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
         assertEquals(exception.getMessage(),
@@ -83,11 +84,11 @@ class InfluxDBSinkBuilderTest {
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
-                                        .setInfluxDBToken(InfluxDBContainer.token)
-                                        .setInfluxDBUsername(InfluxDBContainer.username)
-                                        .setInfluxDBPassword(InfluxDBContainer.password)
-                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
-                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBToken("token")
+                                        .setInfluxDBUsername("username")
+                                        .setInfluxDBPassword("password")
+                                        .setInfluxDBBucket("bucket")
+                                        .setInfluxDBOrganization("organization")
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
         assertEquals(exception.getMessage(),
@@ -102,9 +103,9 @@ class InfluxDBSinkBuilderTest {
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
-                                        .setInfluxDBUsername(InfluxDBContainer.username)
-                                        .setInfluxDBPassword(InfluxDBContainer.password)
-                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBUsername("username")
+                                        .setInfluxDBPassword("password")
+                                        .setInfluxDBOrganization("organization")
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
         assertEquals(exception.getMessage(), "The Bucket name is required but not provided.");
@@ -118,9 +119,9 @@ class InfluxDBSinkBuilderTest {
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
-                                        .setInfluxDBUsername(InfluxDBContainer.username)
-                                        .setInfluxDBPassword(InfluxDBContainer.password)
-                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
+                                        .setInfluxDBUsername("username")
+                                        .setInfluxDBPassword("password")
+                                        .setInfluxDBBucket("bucket")
                                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                                         .build());
         assertEquals(exception.getMessage(), "The Organization name is required but not provided.");
@@ -134,10 +135,10 @@ class InfluxDBSinkBuilderTest {
                         () ->
                                 InfluxDBSink.builder()
                                         .setInfluxDBUrl("http://localhost:8086")
-                                        .setInfluxDBUsername(InfluxDBContainer.username)
-                                        .setInfluxDBPassword(InfluxDBContainer.password)
-                                        .setInfluxDBBucket(InfluxDBContainer.bucket)
-                                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                                        .setInfluxDBUsername("username")
+                                        .setInfluxDBPassword("password")
+                                        .setInfluxDBBucket("bucket")
+                                        .setInfluxDBOrganization("organization")
                                         .build());
         assertEquals(exception.getMessage(), "Serialization schema is required but not provided.");
     }
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkIntegrationTestCase.java
similarity index 85%
copy from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
copy to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkIntegrationTestCase.java
index 138b017..76b4751 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/InfluxDBSinkIntegrationTestCase.java
@@ -15,41 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+package org.apache.flink.streaming.connectors.influxdb.sink;
 
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.domain.WritePrecision;
 import com.influxdb.client.write.Point;
 import com.influxdb.query.FluxRecord;
 import com.influxdb.query.FluxTable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSink;
+import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBTestSerializer;
 import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.util.TestLogger;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 @Testcontainers
 class InfluxDBSinkIntegrationTestCase extends TestLogger {
 
     @Container
-    public static final InfluxDBContainer<?> influxDBContainer =
+    public static final InfluxDBContainer influxDBContainer =
             InfluxDBContainer.createWithDefaultTag();
 
     private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
@@ -81,15 +80,17 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
                 InfluxDBSink.builder()
                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                         .setInfluxDBUrl(influxDBContainer.getUrl())
-                        .setInfluxDBUsername(InfluxDBContainer.username)
-                        .setInfluxDBPassword(InfluxDBContainer.password)
-                        .setInfluxDBBucket(InfluxDBContainer.bucket)
-                        .setInfluxDBOrganization(InfluxDBContainer.organization)
+                        .setInfluxDBUsername(influxDBContainer.getUsername())
+                        .setInfluxDBPassword(influxDBContainer.getPassword())
+                        .setInfluxDBBucket(influxDBContainer.getBucket())
+                        .setInfluxDBOrganization(influxDBContainer.getOrganization())
                         .setWriteBufferSize(2)
                         .addCheckpointDataPoint(true)
                         .build();
 
-        env.addSource(new FiniteTestSource(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+        env.addSource(new FiniteTestSource<>(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>noWatermarks()
+                        .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis()))
                 .sinkTo(influxDBSink);
 
         env.execute();
@@ -100,7 +101,9 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
         assertEquals(actualWrittenPoints.size(), EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size());
 
         final List<String> actualCheckpoints = queryCheckpoints(client);
-        assertTrue(actualCheckpoints.size() >= 4);
+
+        // Checkpoints are called 4 (or more) times. but as no watermark changes this should be 2 effective points
+        assertEquals(2, actualCheckpoints.size());
     }
 
     // ---------------- private helper methods --------------------
@@ -121,7 +124,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
                         "from(bucket: \"%s\") |> "
                                 + "range(start: -1h) |> "
                                 + "filter(fn:(r) => r._measurement == \"test\")",
-                        InfluxDBContainer.bucket);
+                        influxDBContainer.getBucket());
         final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
         for (final FluxTable table : tables) {
             for (final FluxRecord record : table.getRecords()) {
@@ -139,7 +142,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
                         "from(bucket: \"%s\") |> "
                                 + "range(start: -1h) |> "
                                 + "filter(fn:(r) => r._measurement == \"checkpoint\")",
-                        InfluxDBContainer.bucket);
+                        influxDBContainer.getBucket());
 
         final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
         for (final FluxTable table : tables) {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBTestSerializer.java
similarity index 89%
copy from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
copy to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBTestSerializer.java
index c48ed98..17ada08 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBTestSerializer.java
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb.util;
+package org.apache.flink.streaming.connectors.influxdb.sink.writer;
 
 import com.influxdb.client.write.Point;
 import org.apache.flink.api.connector.sink.SinkWriter.Context;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
 import org.jetbrains.annotations.Nullable;
 
 public class InfluxDBTestSerializer implements InfluxDBSchemaSerializer<Long> {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkIntegrationTestCase.java
similarity index 82%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkIntegrationTestCase.java
index 138b017..7f35ae5 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/InfluxDBSinkIntegrationTestCase.java
@@ -15,48 +15,45 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb;
-
-import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.getInfluxDBClient;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+package org.apache.flink.streaming.connectors.influxdb.sink2;
 
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.domain.WritePrecision;
 import com.influxdb.client.write.Point;
 import com.influxdb.query.FluxRecord;
 import com.influxdb.query.FluxTable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSink;
+import org.apache.flink.streaming.connectors.influxdb.sink2.writer.InfluxDBTestSerializer;
 import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBContainer;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestSerializer;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.util.TestLogger;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.influxdb.sink2.InfluxDBSinkOptions.getInfluxDBClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 @Testcontainers
 class InfluxDBSinkIntegrationTestCase extends TestLogger {
 
     @Container
-    public static final InfluxDBContainer<?> influxDBContainer =
+    public static final InfluxDBContainer influxDBContainer =
             InfluxDBContainer.createWithDefaultTag();
 
     private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
 
-    // FiniteTestSource emits list of elements twice
     private static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
-            Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream())
+            SOURCE_DATA.stream()
                     .map(x -> new InfluxDBTestSerializer().serialize(x, null).toLineProtocol())
                     .collect(Collectors.toList());
 
@@ -81,15 +78,16 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
                 InfluxDBSink.builder()
                         .setInfluxDBSchemaSerializer(new InfluxDBTestSerializer())
                         .setInfluxDBUrl(influxDBContainer.getUrl())
-                        .setInfluxDBUsername(InfluxDBContainer.username)
-                        .setInfluxDBPassword(InfluxDBContainer.password)
-                        .setInfluxDBBucket(InfluxDBContainer.bucket)
-                        .setInfluxDBOrganization(InfluxDBContainer.organization)
-                        .setWriteBufferSize(2)
+                        .setInfluxDBUsername(influxDBContainer.getUsername())
+                        .setInfluxDBPassword(influxDBContainer.getPassword())
+                        .setInfluxDBBucket(influxDBContainer.getBucket())
+                        .setInfluxDBOrganization(influxDBContainer.getOrganization())
                         .addCheckpointDataPoint(true)
                         .build();
 
-        env.addSource(new FiniteTestSource(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+        env.addSource(new FiniteTestSource<>(SOURCE_DATA), BasicTypeInfo.LONG_TYPE_INFO)
+                .assignTimestampsAndWatermarks(WatermarkStrategy.<Long>noWatermarks()
+                        .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis()))
                 .sinkTo(influxDBSink);
 
         env.execute();
@@ -97,10 +95,12 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
         final InfluxDBClient client = getInfluxDBClient(influxDBSink.getConfiguration());
         final List<String> actualWrittenPoints = queryWrittenData(client);
 
-        assertEquals(actualWrittenPoints.size(), EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size());
+        assertEquals(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.size(), actualWrittenPoints.size());
 
         final List<String> actualCheckpoints = queryCheckpoints(client);
-        assertTrue(actualCheckpoints.size() >= 4);
+
+        // Checkpoints are called 4 (or more) times. but as no watermark changes this should be 2 effective points
+        assertEquals(2, actualCheckpoints.size());
     }
 
     // ---------------- private helper methods --------------------
@@ -121,7 +121,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
                         "from(bucket: \"%s\") |> "
                                 + "range(start: -1h) |> "
                                 + "filter(fn:(r) => r._measurement == \"test\")",
-                        InfluxDBContainer.bucket);
+                        influxDBContainer.getBucket());
         final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
         for (final FluxTable table : tables) {
             for (final FluxRecord record : table.getRecords()) {
@@ -139,7 +139,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
                         "from(bucket: \"%s\") |> "
                                 + "range(start: -1h) |> "
                                 + "filter(fn:(r) => r._measurement == \"checkpoint\")",
-                        InfluxDBContainer.bucket);
+                        influxDBContainer.getBucket());
 
         final List<FluxTable> tables = influxDBClient.getQueryApi().query(query);
         for (final FluxTable table : tables) {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBTestSerializer.java
similarity index 85%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBTestSerializer.java
index c48ed98..c6a7b55 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestSerializer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/sink2/writer/InfluxDBTestSerializer.java
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb.util;
+package org.apache.flink.streaming.connectors.influxdb.sink2.writer;
 
 import com.influxdb.client.write.Point;
-import org.apache.flink.api.connector.sink.SinkWriter.Context;
-import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
 import org.jetbrains.annotations.Nullable;
 
 public class InfluxDBTestSerializer implements InfluxDBSchemaSerializer<Long> {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
index 4045b8c..71fa714 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceBuilderTest.java
@@ -17,11 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.source;
 
+import org.junit.jupiter.api.Test;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import org.junit.jupiter.api.Test;
-
 class InfluxDBSourceBuilderTest {
     @Test
     void shouldNotBuildSourceWhenSchemaDeserializerIsNotProvided() {
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceIntegrationTestCase.java
similarity index 90%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceIntegrationTestCase.java
index 8165533..1ce50e2 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSourceIntegrationTestCase.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/InfluxDBSourceIntegrationTestCase.java
@@ -15,21 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb;
+package org.apache.flink.streaming.connectors.influxdb.source;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import com.google.api.client.http.ByteArrayContent;
-import com.google.api.client.http.GenericUrl;
-import com.google.api.client.http.HttpBackOffIOExceptionHandler;
-import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
-import com.google.api.client.http.HttpContent;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpRequestFactory;
-import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.*;
 import com.google.api.client.http.javanet.NetHttpTransport;
 import com.google.api.client.util.ExponentialBackOff;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.influxdb.source.reader.InfluxDBTestDeserializer;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.*;
+
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.ServerSocket;
@@ -37,19 +36,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.influxdb.source.InfluxDBSource;
-import org.apache.flink.streaming.connectors.influxdb.util.InfluxDBTestDeserializer;
-import org.apache.flink.util.TestLogger;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Integration test for the InfluxDB source for Flink. */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBTestDeserializer.java
similarity index 94%
rename from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java
rename to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBTestDeserializer.java
index d231d89..11d4f8e 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBTestDeserializer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/source/reader/InfluxDBTestDeserializer.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.influxdb.util;
+package org.apache.flink.streaming.connectors.influxdb.source.reader;
 
 import org.apache.flink.streaming.connectors.influxdb.common.DataPoint;
 import org.apache.flink.streaming.connectors.influxdb.source.reader.deserializer.InfluxDBDataPointDeserializer;
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
index b8a0d69..6c7bcb9 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
@@ -17,93 +17,25 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.util;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.io.IOException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
 import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.MountableFile;
 
-public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
-        extends GenericContainer<SELF> {
+public final class InfluxDBContainer extends InfluxDBContainerCustom {
 
     private static final Logger LOG = LoggerFactory.getLogger(InfluxDBContainer.class);
 
-    public static final Integer INFLUXDB_PORT = 8086;
-
-    private static final String REGISTRY = "quay.io";
-    private static final String REPOSITORY = "influxdb/influxdb";
-    private static final String TAG = "v2.0.2";
     private static final DockerImageName DEFAULT_IMAGE_NAME =
-            DockerImageName.parse(String.format("%s/%s:%s", REGISTRY, REPOSITORY, TAG));
-    private static final int NO_CONTENT_STATUS_CODE = 204;
-    private static final String INFLUX_SETUP_SH = "influx-setup.sh";
-
-    public static final String username = "test-user";
-    public static final String password = "test-password";
-    public static final String token = "access-token";
-    public static final String bucket = "test-bucket";
-    public static final String organization = "test-org";
-    private static final int retention = 0;
-    private static final String retentionUnit = "ns";
+            DockerImageName.parse("quay.io/influxdb/influxdb:v2.0.2");
 
     private InfluxDBContainer(final DockerImageName imageName) {
         super(imageName);
-        imageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
-        this.setEnv();
-        this.waitStrategy =
-                (new WaitAllStrategy())
-                        .withStrategy(
-                                Wait.forHttp("/ping")
-                                        .withBasicCredentials(username, password)
-                                        .forStatusCode(NO_CONTENT_STATUS_CODE))
-                        .withStrategy(Wait.forListeningPort());
 
-        this.addExposedPort(INFLUXDB_PORT);
-        this.startContainer();
     }
 
-    public static InfluxDBContainer<?> createWithDefaultTag() {
+    public static InfluxDBContainer createWithDefaultTag() {
         LOG.info("Starting influxDB test container with default tag {}", DEFAULT_IMAGE_NAME);
-        return new InfluxDBContainer<>(DEFAULT_IMAGE_NAME);
-    }
-
-    private void setEnv() {
-        this.addEnv("INFLUXDB_USER", username);
-        this.addEnv("INFLUXDB_PASSWORD", password);
-        this.addEnv("INFLUXDB_BUCKET", bucket);
-        this.addEnv("INFLUXDB_ORG", organization);
-        this.addEnv("INFLUXDB_RETENTION", String.valueOf(retention));
-        this.addEnv("INFLUXDB_RETENTION_UNIT", retentionUnit);
+        return new InfluxDBContainer(DEFAULT_IMAGE_NAME);
     }
 
-    private void startContainer() {
-        this.withCopyFileToContainer(
-                MountableFile.forClasspathResource(INFLUX_SETUP_SH),
-                String.format("%s", INFLUX_SETUP_SH));
-        this.start();
-        this.setUpInfluxDB();
-        LOG.info("Started InfluxDB container on: {}", this.getUrl());
-    }
-
-    private void setUpInfluxDB() {
-        final ExecResult execResult;
-        try {
-            execResult = this.execInContainer("chmod", "-x", String.format("/%s", INFLUX_SETUP_SH));
-            assertEquals(execResult.getExitCode(), 0);
-            final ExecResult writeResult =
-                    this.execInContainer("/bin/bash", String.format("/%s", INFLUX_SETUP_SH));
-            assertEquals(writeResult.getExitCode(), 0);
-        } catch (final InterruptedException | IOException e) {
-            LOG.error("An error occurred while setting up InfluxDB {}", e.getMessage());
-        }
-    }
-
-    public String getUrl() {
-        return "http://" + this.getHost() + ":" + this.getMappedPort(INFLUXDB_PORT);
-    }
 }
diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainerCustom.java
similarity index 75%
copy from flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
copy to flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainerCustom.java
index b8a0d69..8950079 100644
--- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainer.java
+++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/util/InfluxDBContainerCustom.java
@@ -17,9 +17,6 @@
  */
 package org.apache.flink.streaming.connectors.influxdb.util;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.io.IOException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.GenericContainer;
@@ -28,30 +25,29 @@ import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
 import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.MountableFile;
 
-public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
-        extends GenericContainer<SELF> {
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class InfluxDBContainerCustom
+        extends GenericContainer<InfluxDBContainerCustom> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(InfluxDBContainer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(InfluxDBContainerCustom.class);
 
     public static final Integer INFLUXDB_PORT = 8086;
 
-    private static final String REGISTRY = "quay.io";
-    private static final String REPOSITORY = "influxdb/influxdb";
     private static final String TAG = "v2.0.2";
     private static final DockerImageName DEFAULT_IMAGE_NAME =
-            DockerImageName.parse(String.format("%s/%s:%s", REGISTRY, REPOSITORY, TAG));
+            DockerImageName.parse("quay.io/influxdb/influxdb:v2.0.2");
     private static final int NO_CONTENT_STATUS_CODE = 204;
     private static final String INFLUX_SETUP_SH = "influx-setup.sh";
 
-    public static final String username = "test-user";
-    public static final String password = "test-password";
-    public static final String token = "access-token";
-    public static final String bucket = "test-bucket";
-    public static final String organization = "test-org";
-    private static final int retention = 0;
-    private static final String retentionUnit = "ns";
+    private static final String username = "test-user";
+    private static final String password = "test-password";
+    private static final String bucket = "test-bucket";
+    private static final String organization = "test-org";
 
-    private InfluxDBContainer(final DockerImageName imageName) {
+    public InfluxDBContainerCustom(final DockerImageName imageName) {
         super(imageName);
         imageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
         this.setEnv();
@@ -67,9 +63,20 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
         this.startContainer();
     }
 
-    public static InfluxDBContainer<?> createWithDefaultTag() {
-        LOG.info("Starting influxDB test container with default tag {}", DEFAULT_IMAGE_NAME);
-        return new InfluxDBContainer<>(DEFAULT_IMAGE_NAME);
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public String getOrganization() {
+        return organization;
     }
 
     private void setEnv() {
@@ -77,8 +84,6 @@ public final class InfluxDBContainer<SELF extends InfluxDBContainer<SELF>>
         this.addEnv("INFLUXDB_PASSWORD", password);
         this.addEnv("INFLUXDB_BUCKET", bucket);
         this.addEnv("INFLUXDB_ORG", organization);
-        this.addEnv("INFLUXDB_RETENTION", String.valueOf(retention));
-        this.addEnv("INFLUXDB_RETENTION_UNIT", retentionUnit);
     }
 
     private void startContainer() {
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 20b16b4..134d6f7 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -36,13 +36,6 @@
 
   <dependencyManagement>
     <dependencies>
-      <dependency>
-        <groupId>org.apache.kudu</groupId>
-        <artifactId>kudu-binary</artifactId>
-        <version>${kudu.version}</version>
-        <classifier>${os.detected.classifier}</classifier>
-        <scope>test</scope>
-      </dependency>
       <dependency>
         <groupId>org.apache.kudu</groupId>
         <artifactId>kudu-client</artifactId>
@@ -85,11 +78,11 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_${scala.binary.version}</artifactId>
+      <artifactId>flink-clients</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <artifactId>flink-streaming-java</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
@@ -99,11 +92,6 @@
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.kudu</groupId>
-      <artifactId>kudu-binary</artifactId>
-      <classifier>${os.detected.classifier}</classifier>
-    </dependency>
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
index b178308..4997938 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.ColumnSchema;
 
 import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
index 4a475e9..fd9bfa4 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.CreateTableOptions;
 
 import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index e7a8d16..94e2e26 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.data.binary.BinaryStringData;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduPredicate;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index baae8a0..655a914 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -16,9 +16,8 @@
  */
 package org.apache.flink.connectors.kudu.connector;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
index b7dc702..5196d7f 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
@@ -17,11 +17,7 @@
 
 package org.apache.flink.connectors.kudu.connector.convertor;
 
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.*;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.RowResult;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
index 3c8954f..c67c6ed 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
@@ -17,7 +17,6 @@
 package org.apache.flink.connectors.kudu.connector.failure;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.RowError;
 
 import java.io.IOException;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 6816fc3..72b734c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -16,18 +16,12 @@
  */
 package org.apache.flink.connectors.kudu.connector.reader;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanToken;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
index 468cb1e..4727488 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -16,9 +16,8 @@
  */
 package org.apache.flink.connectors.kudu.connector.reader;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
index d9f8219..794d56b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
@@ -17,7 +17,6 @@
 package org.apache.flink.connectors.kudu.connector.writer;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
index 4878ab3..886a3ea 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
@@ -17,7 +17,6 @@
 package org.apache.flink.connectors.kudu.connector.writer;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 59ad196..2171a43 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -21,14 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
 import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
-
-import org.apache.kudu.client.DeleteTableResponse;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 6c6d216..9b63494 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -16,9 +16,8 @@
  */
 package org.apache.flink.connectors.kudu.connector.writer;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.kudu.client.AsyncKuduClient;
 
 import java.io.Serializable;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
index db44eec..253146c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
@@ -19,11 +19,7 @@ package org.apache.flink.connectors.kudu.connector.writer;
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 @PublicEvolving
 public class PojoOperationMapper<T> extends AbstractSingleOperationMapper<T> {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
index 900515d..bc1b031 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
@@ -28,7 +28,6 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
index a671408..a2e3e47 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
index 2e1c63e..ffe02a8 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
@@ -19,26 +19,8 @@
 package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index 734e219..20ec341 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -21,29 +21,15 @@ package org.apache.flink.connectors.kudu.table;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
-import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.KuduClient;
@@ -53,19 +39,10 @@ import org.apache.kudu.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.*;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -92,7 +69,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
      * @param kuduMasters Connection address to Kudu
      */
     public KuduCatalog(String catalogName, String kuduMasters) {
-        super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+        super(catalogName, "default_database");
         this.kuduMasters = kuduMasters;
         this.kuduClient = createClient();
     }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 9112b0a..c46ac85 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -30,32 +30,12 @@ import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.types.Row;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.Rowtime.*;
+import static org.apache.flink.table.descriptors.Schema.*;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
index ad98e86..74daa89 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -29,11 +29,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.sources.FilterableTableSource;
-import org.apache.flink.table.sources.LimitableTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.*;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -42,12 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Optional;
+import java.util.*;
 
 import static org.apache.flink.connectors.kudu.table.utils.KuduTableUtils.toKuduFilterInfo;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
index 847dad4..31c8d79 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
@@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.types.Row;
-
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
index 2022cd7..cde6a13 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
@@ -27,26 +27,24 @@ import org.apache.flink.connectors.kudu.table.function.lookup.KuduRowDataLookupF
 import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.*;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
+import java.util.*;
+
+import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly;
 
 /**
  * A {@link DynamicTableSource} for Kudu.
@@ -138,12 +136,28 @@ public class KuduDynamicTableSource implements ScanTableSource, SupportsProjecti
     }
 
     @Override
-    public void applyProjection(int[][] projectedFields) {
+    public void applyProjection(int[][] projectedFields, DataType producedDataType) {
         // parser projectFields
-        this.physicalSchema = TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields);
+        this.physicalSchema = projectSchema(this.physicalSchema, projectedFields);
         this.projectedFields = physicalSchema.getFieldNames();
     }
 
+    private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {
+        checkArgument(
+                containsPhysicalColumnsOnly(tableSchema),
+                "Projection is only supported for physical columns.");
+        TableSchema.Builder builder = TableSchema.builder();
+
+        FieldsDataType fields =
+                (FieldsDataType)
+                        DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields);
+        RowType topFields = (RowType) fields.getLogicalType();
+        for (int i = 0; i < topFields.getFieldCount(); i++) {
+            builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i));
+        }
+        return builder.build();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
index b531835..3dff23c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
@@ -22,21 +22,9 @@ import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.table.AbstractReadOnlyCatalog;
 import org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory;
 import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
@@ -52,20 +40,10 @@ import org.apache.kudu.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -86,7 +64,7 @@ public class KuduDynamicCatalog extends AbstractReadOnlyCatalog {
      * @param kuduMasters Connection address to Kudu
      */
     public KuduDynamicCatalog(String catalogName, String kuduMasters) {
-        super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+        super(catalogName, "default_database");
         this.kuduMasters = kuduMasters;
         this.kuduClient = createClient();
     }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
index 1d5be62..f5d7ca7 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -42,18 +42,10 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
-import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*;
 
 
 public class KuduTableUtils {
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
index c445465..15f7be7 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
@@ -20,20 +20,8 @@ package org.apache.flink.connectors.kudu.table.utils;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.FloatType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.VarBinaryType;
-import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.*;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
-
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Type;
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index bcc9b2d..ac4db4c 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -36,11 +36,7 @@ import org.apache.flink.types.Row;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.*;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -76,7 +72,7 @@ public class KuduTestBase {
     public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"};
     private static GenericContainer<?> master;
     private static List<GenericContainer<?>> tServers;
-    private static String masterAddress;
+    private static HostAndPort masterAddress;
     private static KuduClient kuduClient;
 
     @BeforeAll
@@ -90,7 +86,7 @@ public class KuduTestBase {
                 .withNetwork(network)
                 .withNetworkAliases("kudu-master");
         master.start();
-        masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)).toString();
+        masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT));
 
         for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) {
             String instanceName = "kudu-tserver-" + instance;
@@ -98,8 +94,8 @@ public class KuduTestBase {
                     .withExposedPorts(KUDU_TSERVER_PORT)
                     .withCommand("tserver")
                     .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT)
-                    .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false " +
-                            "--rpc_advertised_addresses=" + instanceName)
+                    .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --logtostderr "
+                            +" --use_hybrid_clock=false --rpc_advertised_addresses=" + instanceName)
                     .withNetwork(network)
                     .withNetworkAliases(instanceName)
                     .dependsOn(master);
@@ -108,8 +104,7 @@ public class KuduTestBase {
         }
         tServers = tServersBuilder.build();
 
-        System.out.println(HostAndPort.fromParts(master.getHost(), master.getMappedPort(8051)).toString());
-        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build();
+        kuduClient = new KuduClient.KuduClientBuilder(masterAddress.toString()).build();
     }
 
     @AfterAll
@@ -239,7 +234,7 @@ public class KuduTestBase {
     }
 
     public String getMasterAddress() {
-        return masterAddress;
+        return masterAddress.toString();
     }
 
     public KuduClient getClient() {
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
index dc8f777..f53b7c5 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
@@ -16,13 +16,12 @@
  */
 package org.apache.flink.connectors.kudu.format;
 
-import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
 import org.apache.flink.types.Row;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 6791765..1764608 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.types.Row;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.CreateTableOptions;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 1927631..d403d9c 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
-
 import org.apache.flink.types.Row;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index f6482da..0375c68 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -40,18 +40,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.*;
 
 public class KuduTableFactoryTest extends KuduTestBase {
     private StreamTableEnvironment tableEnv;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index d3d4a63..4a53198 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -53,7 +53,7 @@ public class KuduTableSourceITCase extends KuduTestBase {
         it.forEachRemaining(results::add);
         assertEquals(5, results.size());
         assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString());
-        tableEnv.sqlUpdate("DROP TABLE books");
+        tableEnv.executeSql("DROP TABLE books");
     }
 
 
@@ -66,6 +66,6 @@ public class KuduTableSourceITCase extends KuduTestBase {
         it.forEachRemaining(results::add);
         assertEquals(1, results.size());
         assertEquals("More Java for more dummies", results.get(0).toString());
-        tableEnv.sqlUpdate("DROP TABLE books");
+        tableEnv.executeSql("DROP TABLE books");
     }
 }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
index 43734e4..d4101aa 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -19,15 +19,10 @@ package org.apache.flink.connectors.kudu.table;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.*;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.types.DataType;
@@ -43,14 +38,7 @@ import java.util.List;
 import static java.util.Collections.singletonList;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 /**
  * Unit Tests for {@link KuduTableSource}.
@@ -68,7 +56,7 @@ public class KuduTableSourceTest extends KuduTestBase {
         KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
         catalog = new KuduCatalog(getMasterAddress());
-        ObjectPath op = new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books");
+        ObjectPath op = new ObjectPath("default_database", "books");
         try {
             kuduTableSource = catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op));
         } catch (TableNotExistException e) {
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
index 4eae7bf..affdd04 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -26,14 +26,14 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
 public class KuduTableTestUtils {
 
     public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamingMode(StreamExecutionEnvironment env) {
-        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
         tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
         return tableEnv;
     }
 
     public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode() {
-        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
         TableEnvironment tableEnv = TableEnvironment.create(settings);
         tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
         return tableEnv;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
index f37b40d..12d82a9 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
@@ -17,14 +17,8 @@
 package org.apache.flink.connectors.kudu.writer;
 
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
-
 import org.apache.kudu.Schema;
-import org.apache.kudu.client.Delete;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.Update;
-import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.*;
 import org.junit.jupiter.api.BeforeEach;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
index 45e0b1b..f03c469 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
@@ -17,11 +17,10 @@
 
 package org.apache.flink.connectors.kudu.writer;
 
-import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
 import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.PojoOperationMapper;
-
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
 import org.junit.jupiter.api.Test;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
index e737063..3aeb673 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
@@ -20,7 +20,6 @@ import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
 import org.apache.flink.types.Row;
-
 import org.apache.kudu.client.Operation;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
index 308a011..a52b7c1 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
@@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
-
 import org.apache.kudu.client.Operation;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git a/flink-connector-netty/pom.xml b/flink-connector-netty/pom.xml
index 6f76037..ad450e3 100644
--- a/flink-connector-netty/pom.xml
+++ b/flink-connector-netty/pom.xml
@@ -82,7 +82,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_${scala.binary.version}</artifactId>
+      <artifactId>flink-clients</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
index 5bca265..4403507 100644
--- a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
@@ -17,10 +17,10 @@
 package org.apache.flink.streaming.connectors.netty.example
 
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.EnvironmentSettings
-import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
 
 /**
  * Simple example for demonstrating the use of SQL on a Stream Table.
@@ -40,7 +40,7 @@ object StreamSqlExample {
     val param = ParameterTool.fromArgs(args)
 
     // set up execution environment
-    val envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+    val envSettings = EnvironmentSettings.newInstance().inStreamingMode().build()
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, envSettings)
 
@@ -62,8 +62,7 @@ object StreamSqlExample {
 
     // union the two tables
     val result = tEnv.sqlQuery("SELECT * FROM OrderA WHERE amount > 2")
-
-    result.toAppendStream[Order].print()
+    tEnv.toDataStream(result).print()
 
     env.execute()
   }
diff --git a/flink-connector-pinot/pom.xml b/flink-connector-pinot/pom.xml
index 670504a..6b1aaab 100644
--- a/flink-connector-pinot/pom.xml
+++ b/flink-connector-pinot/pom.xml
@@ -102,7 +102,11 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
             <type>test-jar</type>
         </dependency>
         <dependency>
@@ -118,13 +122,19 @@ under the License.
             <groupId>org.apache.pinot</groupId>
             <artifactId>pinot-java-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.pinot</groupId>
             <artifactId>pinot-tools</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <artifactId>flink-test-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
deleted file mode 100644
index 7d2fe94..0000000
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.pinot;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter;
-import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
-import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
-import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
-import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
-import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer;
-import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer;
-import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkWriterStateSerializer;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment;
-import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.core.segment.name.SegmentNameGenerator;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache Pinot table. The sink
- * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code RuntimeExecutionMode.BATCH}
- * mode. But ensure to enable checkpointing when using in streaming mode.
- *
- * <p>We advise you to use the provided {@link PinotSink.Builder} to build and configure the
- * PinotSink. All the communication with the Pinot cluster's table is managed via the Pinot
- * controller. Thus you need to provide its host and port as well as the target Pinot table.
- * The {@link TableConfig} and {@link Schema} is automatically retrieved via the Pinot controller API
- * and therefore does not need to be provided.
- *
- * <p>Whenever an element is received by the sink it gets stored in a {@link PinotWriterSegment}. A
- * {@link PinotWriterSegment} represents exactly one segment that will be pushed to the Pinot
- * cluster later on. Its size is determined by the customizable {@code maxRowsPerSegment} parameter.
- * Please note that the maximum segment size that can be handled by this sink is limited by the
- * lower bound of memory available at each subTask.
- * Each subTask holds a list of {@link PinotWriterSegment}s of which at most one is active. An
- * active {@link PinotWriterSegment} is capable of accepting at least one more element. If a
- * {@link PinotWriterSegment} switches from active to inactive it flushes its
- * {@code maxRowsPerSegment} elements to disk. The data file is stored in the local filesystem's
- * temporary directory and contains serialized elements. We use the {@link JsonSerializer} to
- * serialize elements to JSON.
- *
- * <p>On checkpointing all not in-progress {@link PinotWriterSegment}s are transformed into
- * committables. As the data files need to be shared across nodes, the sink requires access to a
- * shared filesystem. We use the {@link FileSystemAdapter} for that purpose.
- * A {@link FileSystemAdapter} is capable of copying a file from the local to the shared filesystem
- * and vice-versa. A {@link PinotSinkCommittable} contains a reference to a data file on the shared
- * filesystem as well as the minimum and maximum timestamp contained in the data file. A timestamp -
- * usually the event time - is extracted from each received element via {@link EventTimeExtractor}.
- * The timestamps are later on required to follow the guideline for naming Pinot segments.
- * An eventually existent in-progress {@link PinotWriterSegment}'s state is saved in the snapshot
- * taken when checkpointing. This ensures that the at-most-once delivery guarantee can be fulfilled
- * when recovering from failures.
- *
- * <p>We use the {@link PinotSinkGlobalCommitter} to collect all created
- * {@link PinotSinkCommittable}s, create segments from the referenced data files and finally push them
- * to the Pinot table. Therefore, the minimum and maximum timestamp of all
- * {@link PinotSinkCommittable} is determined. The segment names are then generated using the
- * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum timestamp as input.
- * The segment generation starts with downloading the referenced data file from the shared file system
- * using the provided {@link FileSystemAdapter}. Once this is was completed, we use Pinot's
- * {@link SegmentIndexCreationDriver} to generate the final segment. Each segment is thereby stored
- * in a temporary directory on the local filesystem. Next, the segment is uploaded to the Pinot
- * controller using Pinot's {@link UploadSegmentCommand}.
- *
- * <p>To ensure that possible failures are handled accordingly each segment name is checked for
- * existence within the Pinot cluster before uploading a segment. In case a segment name already
- * exists, i.e. if the last commit failed partially with some segments already been uploaded, the
- * existing segment is deleted first. When the elements since the last checkpoint are replayed the
- * minimum and maximum timestamp of all received elements will be the same. Thus the same set of
- * segment names is generated and we can delete previous segments by checking for segment name
- * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be deterministic. We also provide
- * a {@link SimpleSegmentNameGenerator} which is a simple but for most users suitable segment name
- * generator.
- *
- * <p>Please note that we use the {@link GlobalCommitter} to ensure consistent segment naming. This
- * comes with performance limitations as a {@link GlobalCommitter} always runs at a parallelism of 1
- * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} that does all the
- * computational intensive work (i.e. generating and uploading segments). In order to overcome this
- * issue we introduce a custom multithreading approach within the {@link PinotSinkGlobalCommitter}
- * to parallelize the segment creation and upload process.
- *
- * @param <IN> Type of incoming elements
- */
-public class PinotSink<IN> implements Sink<IN, PinotSinkCommittable, PinotSinkWriterState, PinotSinkGlobalCommittable> {
-
-    private final String pinotControllerHost;
-    private final String pinotControllerPort;
-    private final String tableName;
-    private final int maxRowsPerSegment;
-    private final String tempDirPrefix;
-    private final JsonSerializer<IN> jsonSerializer;
-    private final SegmentNameGenerator segmentNameGenerator;
-    private final FileSystemAdapter fsAdapter;
-    private final EventTimeExtractor<IN> eventTimeExtractor;
-    private final int numCommitThreads;
-
-    /**
-     * @param pinotControllerHost  Host of the Pinot controller
-     * @param pinotControllerPort  Port of the Pinot controller
-     * @param tableName            Target table's name
-     * @param maxRowsPerSegment    Maximum number of rows to be stored within a Pinot segment
-     * @param tempDirPrefix        Prefix for temp directories used
-     * @param jsonSerializer       Serializer used to convert elements to JSON
-     * @param eventTimeExtractor   Defines the way event times are extracted from received objects
-     * @param segmentNameGenerator Pinot segment name generator
-     * @param fsAdapter            Filesystem adapter used to save files for sharing files across nodes
-     * @param numCommitThreads     Number of threads used in the {@link PinotSinkGlobalCommitter} for committing segments
-     */
-    private PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName,
-                      int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer,
-                      EventTimeExtractor<IN> eventTimeExtractor,
-                      SegmentNameGenerator segmentNameGenerator, FileSystemAdapter fsAdapter,
-                      int numCommitThreads) {
-        this.pinotControllerHost = checkNotNull(pinotControllerHost);
-        this.pinotControllerPort = checkNotNull(pinotControllerPort);
-        this.tableName = checkNotNull(tableName);
-
-        checkArgument(maxRowsPerSegment > 0);
-        this.maxRowsPerSegment = maxRowsPerSegment;
-        this.tempDirPrefix = checkNotNull(tempDirPrefix);
-        this.jsonSerializer = checkNotNull(jsonSerializer);
-        this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
-        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
-        this.fsAdapter = checkNotNull(fsAdapter);
-        checkArgument(numCommitThreads > 0);
-        this.numCommitThreads = numCommitThreads;
-    }
-
-    /**
-     * Creates a Pinot sink writer.
-     *
-     * @param context InitContext
-     * @param states  State extracted from snapshot. This list must not have a size larger than 1
-     */
-    @Override
-    public PinotSinkWriter<IN> createWriter(InitContext context, List<PinotSinkWriterState> states) {
-        PinotSinkWriter<IN> writer = new PinotSinkWriter<>(
-                context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor,
-                jsonSerializer, fsAdapter
-        );
-
-        if (states.size() == 1) {
-            writer.initializeState(states.get(0));
-        } else if (states.size() > 1) {
-            throw new IllegalStateException("Did not expected more than one element in states.");
-        }
-        return writer;
-    }
-
-    /**
-     * The PinotSink does not use a committer. Instead a global committer is used
-     *
-     * @return Empty Optional
-     */
-    @Override
-    public Optional<Committer<PinotSinkCommittable>> createCommitter() {
-        return Optional.empty();
-    }
-
-    /**
-     * Creates the global committer.
-     */
-    @Override
-    public Optional<GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable>> createGlobalCommitter() throws IOException {
-        String timeColumnName = eventTimeExtractor.getTimeColumn();
-        TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit();
-        PinotSinkGlobalCommitter committer = new PinotSinkGlobalCommitter(
-                pinotControllerHost, pinotControllerPort, tableName, segmentNameGenerator,
-                tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, numCommitThreads
-        );
-        return Optional.of(committer);
-    }
-
-    /**
-     * Creates the committables' serializer.
-     */
-    @Override
-    public Optional<SimpleVersionedSerializer<PinotSinkCommittable>> getCommittableSerializer() {
-        return Optional.of(new PinotSinkCommittableSerializer());
-    }
-
-    /**
-     * Creates the global committables' serializer.
-     */
-    @Override
-    public Optional<SimpleVersionedSerializer<PinotSinkGlobalCommittable>> getGlobalCommittableSerializer() {
-        return Optional.of(new PinotSinkGlobalCommittableSerializer());
-    }
-
-    /**
-     * The PinotSink does not use writer states.
-     *
-     * @return Empty Optional
-     */
-    @Override
-    public Optional<SimpleVersionedSerializer<PinotSinkWriterState>> getWriterStateSerializer() {
-        return Optional.of(new PinotSinkWriterStateSerializer());
-    }
-
-    /**
-     * Builder for configuring a {@link PinotSink}. This is the recommended public API.
-     *
-     * @param <IN> Type of incoming elements
-     */
-    public static class Builder<IN> {
-
-        static final int DEFAULT_COMMIT_THREADS = 4;
-
-        String pinotControllerHost;
-        String pinotControllerPort;
-        String tableName;
-        int maxRowsPerSegment;
-        String tempDirPrefix = "flink-connector-pinot";
-        JsonSerializer<IN> jsonSerializer;
-        EventTimeExtractor<IN> eventTimeExtractor;
-        SegmentNameGenerator segmentNameGenerator;
-        FileSystemAdapter fsAdapter;
-        int numCommitThreads = DEFAULT_COMMIT_THREADS;
-
-        /**
-         * Defines the basic connection parameters.
-         *
-         * @param pinotControllerHost Host of the Pinot controller
-         * @param pinotControllerPort Port of the Pinot controller
-         * @param tableName           Target table's name
-         */
-        public Builder(String pinotControllerHost, String pinotControllerPort, String tableName) {
-            this.pinotControllerHost = pinotControllerHost;
-            this.pinotControllerPort = pinotControllerPort;
-            this.tableName = tableName;
-        }
-
-        /**
-         * Defines the serializer used to serialize elements to JSON format.
-         *
-         * @param jsonSerializer JsonSerializer
-         * @return Builder
-         */
-        public Builder<IN> withJsonSerializer(JsonSerializer<IN> jsonSerializer) {
-            this.jsonSerializer = jsonSerializer;
-            return this;
-        }
-
-        /**
-         * Defines the EventTimeExtractor<IN> used to extract event times from received objects.
-         *
-         * @param eventTimeExtractor EventTimeExtractor
-         * @return Builder
-         */
-        public Builder<IN> withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor) {
-            this.eventTimeExtractor = eventTimeExtractor;
-            return this;
-        }
-
-        /**
-         * Defines the SegmentNameGenerator used to generate names for the segments pushed to Pinot.
-         *
-         * @param segmentNameGenerator SegmentNameGenerator
-         * @return Builder
-         */
-        public Builder<IN> withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
-            this.segmentNameGenerator = segmentNameGenerator;
-            return this;
-        }
-
-        /**
-         * Defines a basic segment name generator which will be used to generate names for the
-         * segments pushed to Pinot.
-         *
-         * @param segmentNamePostfix Postfix which will be appended to the segment name to identify
-         *                           segments coming from this Flink sink
-         * @return Builder
-         */
-        public Builder<IN> withSimpleSegmentNameGenerator(String segmentNamePostfix) {
-            return withSegmentNameGenerator(new SimpleSegmentNameGenerator(tableName, segmentNamePostfix));
-        }
-
-        /**
-         * Defines the FileSystemAdapter used share data files between the {@link PinotSinkWriter} and
-         * the {@link PinotSinkGlobalCommitter}.
-         *
-         * @param fsAdapter Adapter for interacting with the shared file system
-         * @return Builder
-         */
-        public Builder<IN> withFileSystemAdapter(FileSystemAdapter fsAdapter) {
-            this.fsAdapter = fsAdapter;
-            return this;
-        }
-
-        /**
-         * Defines the segment size via the maximum number of elements per segment.
-         *
-         * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
-         * @return Builder
-         */
-        public Builder<IN> withMaxRowsPerSegment(int maxRowsPerSegment) {
-            this.maxRowsPerSegment = maxRowsPerSegment;
-            return this;
-        }
-
-        /**
-         * Defines the path prefix for the files created in a node's local filesystem.
-         *
-         * @param tempDirPrefix Prefix for temp directories used
-         * @return Builder
-         */
-        public Builder<IN> withTempDirectoryPrefix(String tempDirPrefix) {
-            this.tempDirPrefix = tempDirPrefix;
-            return this;
-        }
-
-        /**
-         * Defines the number of threads that shall be used to commit segments in the {@link PinotSinkGlobalCommitter}.
-         *
-         * @param numCommitThreads Number of threads
-         * @return Builder
-         */
-        public Builder<IN> withNumCommitThreads(int numCommitThreads) {
-            this.numCommitThreads = numCommitThreads;
-            return this;
-        }
-
-        /**
-         * Finally builds the {@link PinotSink} according to the configuration.
-         *
-         * @return PinotSink
-         */
-        public PinotSink<IN> build() {
-            return new PinotSink<>(
-                    pinotControllerHost,
-                    pinotControllerPort,
-                    tableName,
-                    maxRowsPerSegment,
-                    tempDirPrefix,
-                    jsonSerializer,
-                    eventTimeExtractor,
-                    segmentNameGenerator,
-                    fsAdapter,
-                    numCommitThreads
-            );
-        }
-    }
-}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
deleted file mode 100644
index 8e45620..0000000
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.pinot.serializer;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Serializer for {@link PinotSinkGlobalCommittable}
- */
-@Internal
-public class PinotSinkGlobalCommittableSerializer implements SimpleVersionedSerializer<PinotSinkGlobalCommittable> {
-
-    private static final int CURRENT_VERSION = 1;
-
-    @Override
-    public int getVersion() {
-        return CURRENT_VERSION;
-    }
-
-    @Override
-    public byte[] serialize(PinotSinkGlobalCommittable pinotSinkGlobalCommittable) throws IOException {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-             DataOutputStream out = new DataOutputStream(baos)) {
-            out.writeLong(pinotSinkGlobalCommittable.getMinTimestamp());
-            out.writeLong(pinotSinkGlobalCommittable.getMaxTimestamp());
-
-            int size = pinotSinkGlobalCommittable.getDataFilePaths().size();
-            out.writeInt(size);
-            for (String dataFilePath : pinotSinkGlobalCommittable.getDataFilePaths()) {
-                out.writeUTF(dataFilePath);
-            }
-            out.flush();
-            return baos.toByteArray();
-        }
-    }
-
-    @Override
-    public PinotSinkGlobalCommittable deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
-        switch (version) {
-            case 1:
-                return deserializeV1(serialized);
-            default:
-                throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
-        }
-    }
-
-    private PinotSinkGlobalCommittable deserializeV1(byte[] serialized) throws IOException {
-        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
-             DataInputStream in = new DataInputStream(bais)) {
-            long minTimestamp = in.readLong();
-            long maxTimestamp = in.readLong();
-
-            long size = in.readInt();
-            List<String> dataFilePaths = new ArrayList<>();
-            for (int i = 0; i < size; i++) {
-                dataFilePaths.add(in.readUTF());
-            }
-            return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
-        }
-    }
-}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSink.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSink.java
new file mode 100644
index 0000000..b300b89
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSink.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.v2;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommitter;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter;
+import org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriterState;
+import org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriterStateSerializer;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class PinotSink<IN> implements
+        Sink<IN>,
+        StatefulSink<IN, PinotWriterState>,
+        TwoPhaseCommittingSink<IN, PinotSinkCommittable> {
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final int maxRowsPerSegment;
+    private final String tempDirPrefix;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final FileSystemAdapter fsAdapter;
+    private final EventTimeExtractor<IN> eventTimeExtractor;
+    private final int numCommitThreads;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param maxRowsPerSegment    Maximum number of rows to be stored within a Pinot segment
+     * @param tempDirPrefix        Prefix for temp directories used
+     * @param jsonSerializer       Serializer used to convert elements to JSON
+     * @param eventTimeExtractor   Defines the way event times are extracted from received objects
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Filesystem adapter used to save files for sharing files across nodes
+     * @param numCommitThreads     Number of threads used in the {@link PinotSinkCommitter} for committing segments
+     */
+    protected PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName,
+                      int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer,
+                      EventTimeExtractor<IN> eventTimeExtractor,
+                      SegmentNameGenerator segmentNameGenerator, FileSystemAdapter fsAdapter,
+                      int numCommitThreads) {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+
+        checkArgument(maxRowsPerSegment > 0);
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        checkArgument(numCommitThreads > 0);
+        this.numCommitThreads = numCommitThreads;
+    }
+
+
+    @Override
+    public PinotWriter<IN> createWriter(InitContext context) throws IOException {
+        return this.restoreWriter(context, Collections.emptyList());
+    }
+
+    @Override
+    public PinotWriter<IN> restoreWriter(InitContext context, Collection<PinotWriterState> states) throws IOException {
+        PinotWriter<IN> writer = new PinotWriter<>(
+                context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor,
+                jsonSerializer, fsAdapter, states, this.createCommitter()
+        );
+        return writer;
+    }
+
+    @Override
+    public PinotSinkCommitter createCommitter() throws IOException {
+        String timeColumnName = eventTimeExtractor.getTimeColumn();
+        TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit();
+        PinotSinkCommitter committer = new PinotSinkCommitter(
+                pinotControllerHost, pinotControllerPort, tableName, segmentNameGenerator,
+                tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, numCommitThreads
+        );
+        return committer;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PinotSinkCommittable> getCommittableSerializer() {
+        return new PinotSinkCommittableSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PinotWriterState> getWriterStateSerializer() {
+        return new PinotWriterStateSerializer();
+    }
+
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkBuilder.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkBuilder.java
new file mode 100644
index 0000000..5af5c1d
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkBuilder.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.v2;
+
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommitter;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+
+public class PinotSinkBuilder<IN> {
+
+    static final int DEFAULT_COMMIT_THREADS = 4;
+
+    String pinotControllerHost;
+    String pinotControllerPort;
+    String tableName;
+    int maxRowsPerSegment;
+    String tempDirPrefix = "flink-connector-pinot";
+    JsonSerializer<IN> jsonSerializer;
+    EventTimeExtractor<IN> eventTimeExtractor;
+    SegmentNameGenerator segmentNameGenerator;
+    FileSystemAdapter fsAdapter;
+    int numCommitThreads = DEFAULT_COMMIT_THREADS;
+
+    /**
+     * Defines the basic connection parameters.
+     *
+     * @param pinotControllerHost Host of the Pinot controller
+     * @param pinotControllerPort Port of the Pinot controller
+     * @param tableName           Target table's name
+     */
+    public PinotSinkBuilder(String pinotControllerHost, String pinotControllerPort, String tableName) {
+        this.pinotControllerHost = pinotControllerHost;
+        this.pinotControllerPort = pinotControllerPort;
+        this.tableName = tableName;
+    }
+
+    /**
+     * Defines the serializer used to serialize elements to JSON format.
+     *
+     * @param jsonSerializer JsonSerializer
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withJsonSerializer(JsonSerializer<IN> jsonSerializer) {
+        this.jsonSerializer = jsonSerializer;
+        return this;
+    }
+
+    /**
+     * Defines the EventTimeExtractor<IN> used to extract event times from received objects.
+     *
+     * @param eventTimeExtractor EventTimeExtractor
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor) {
+        this.eventTimeExtractor = eventTimeExtractor;
+        return this;
+    }
+
+    /**
+     * Defines the SegmentNameGenerator used to generate names for the segments pushed to Pinot.
+     *
+     * @param segmentNameGenerator SegmentNameGenerator
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
+        this.segmentNameGenerator = segmentNameGenerator;
+        return this;
+    }
+
+    /**
+     * Defines a basic segment name generator which will be used to generate names for the
+     * segments pushed to Pinot.
+     *
+     * @param segmentNamePostfix Postfix which will be appended to the segment name to identify
+     *                           segments coming from this Flink sink
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withSimpleSegmentNameGenerator(String segmentNamePostfix) {
+        return withSegmentNameGenerator(new SimpleSegmentNameGenerator(tableName, segmentNamePostfix));
+    }
+
+    /**
+     * Defines the FileSystemAdapter used share data files between the {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter} and
+     * the {@link PinotSinkCommitter}.
+     *
+     * @param fsAdapter Adapter for interacting with the shared file system
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withFileSystemAdapter(FileSystemAdapter fsAdapter) {
+        this.fsAdapter = fsAdapter;
+        return this;
+    }
+
+    /**
+     * Defines the segment size via the maximum number of elements per segment.
+     *
+     * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withMaxRowsPerSegment(int maxRowsPerSegment) {
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        return this;
+    }
+
+    /**
+     * Defines the path prefix for the files created in a node's local filesystem.
+     *
+     * @param tempDirPrefix Prefix for temp directories used
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withTempDirectoryPrefix(String tempDirPrefix) {
+        this.tempDirPrefix = tempDirPrefix;
+        return this;
+    }
+
+    /**
+     * Defines the number of threads that shall be used to commit segments in the {@link PinotSinkCommitter}.
+     *
+     * @param numCommitThreads Number of threads
+     * @return Builder
+     */
+    public PinotSinkBuilder<IN> withNumCommitThreads(int numCommitThreads) {
+        this.numCommitThreads = numCommitThreads;
+        return this;
+    }
+
+    /**
+     * Finally builds the {@link PinotSink} according to the configuration.
+     *
+     * @return PinotSink
+     */
+    public PinotSink<IN> build() {
+        return new PinotSink<>(
+                pinotControllerHost,
+                pinotControllerPort,
+                tableName,
+                maxRowsPerSegment,
+                tempDirPrefix,
+                jsonSerializer,
+                eventTimeExtractor,
+                segmentNameGenerator,
+                fsAdapter,
+                numCommitThreads
+        );
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittable.java
similarity index 95%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittable.java
index 5a8c655..611655c 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.committer;
+package org.apache.flink.streaming.connectors.pinot.v2.committer;
 
 import org.apache.flink.annotation.Internal;
 
@@ -26,7 +26,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The PinotSinkCommittable is required for sharing committables with the
- * {@link PinotSinkGlobalCommitter} instance
+ * {@link PinotSinkCommittable} instance
  */
 @Internal
 public class PinotSinkCommittable implements Serializable {
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittableSerializer.java
similarity index 94%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittableSerializer.java
index ed61de2..cce37ae 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommittableSerializer.java
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.serializer;
+package org.apache.flink.streaming.connectors.pinot.v2.committer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
 
 import java.io.*;
 
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java
similarity index 74%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java
index 46e03e4..f6e0d51 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/committer/PinotSinkCommitter.java
@@ -15,11 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.committer;
+package org.apache.flink.streaming.connectors.pinot.v2.committer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.streaming.connectors.pinot.PinotControllerClient;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
 import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
 import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
 import org.apache.pinot.common.segment.ReadMode;
@@ -39,26 +41,24 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.*;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter},
  * generates segments and pushed them to the Pinot controller.
  * Note: We use a custom multithreading approach to parallelize the segment creation and upload to
  * overcome the performance limitations resulting from using a {@link GlobalCommitter} always
  * running at a parallelism of 1.
  */
 @Internal
-public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+public class PinotSinkCommitter implements Committer<PinotSinkCommittable> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkCommitter.class);
 
     private final String pinotControllerHost;
     private final String pinotControllerPort;
@@ -84,11 +84,11 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
      * @param segmentTimeUnit      Unit of the time column
      * @param numCommitThreads     Number of threads used to commit the committables
      */
-    public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort,
-                                    String tableName, SegmentNameGenerator segmentNameGenerator,
-                                    String tempDirPrefix, FileSystemAdapter fsAdapter,
-                                    String timeColumnName, TimeUnit segmentTimeUnit,
-                                    int numCommitThreads) throws IOException {
+    public PinotSinkCommitter(String pinotControllerHost, String pinotControllerPort,
+                              String tableName, SegmentNameGenerator segmentNameGenerator,
+                              String tempDirPrefix, FileSystemAdapter fsAdapter,
+                              String timeColumnName, TimeUnit segmentTimeUnit,
+                              int numCommitThreads) throws IOException {
         this.pinotControllerHost = checkNotNull(pinotControllerHost);
         this.pinotControllerPort = checkNotNull(pinotControllerPort);
         this.tableName = checkNotNull(tableName);
@@ -110,45 +110,98 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
         this.pool = Executors.newFixedThreadPool(numCommitThreads);
     }
 
-    /**
-     * Identifies global committables that need to be re-committed from a list of recovered committables.
-     *
-     * @param globalCommittables List of global committables that are checked for required re-commit
-     * @return List of global committable that need to be re-committed
-     * @throws IOException
-     */
+
     @Override
-    public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
-        // Holds identified global committables whose commit needs to be retried
-        List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>();
+    public void commit(Collection<CommitRequest<PinotSinkCommittable>> collection) throws IOException, InterruptedException {
+        Collection<PinotSinkCommittable> committables = collection.stream()
+                .map(CommitRequest::getCommittable)
+                .collect(Collectors.toList());
+
+        this.commitSink(committables);
+
+    }
 
-        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
-            CommitStatus commitStatus = getCommitStatus(globalCommittable);
+    public void commitSink(Collection<PinotSinkCommittable> collection) throws IOException, InterruptedException {
+        if (collection.isEmpty()) return;
 
-            if (commitStatus.getMissingSegmentNames().isEmpty()) {
-                // All segments were already committed. Thus, we do not need to retry the commit.
-                continue;
+        // List of failed global committables that can be retried later on
+        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+        PinotSinkGlobalCommittable globalCommittable = this.combine(new ArrayList<>(collection));
+
+        filterRecoveredCommittables(globalCommittable);
+
+        Set<Future<Boolean>> resultFutures = new HashSet<>();
+        // Commit all segments in globalCommittable
+        for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+            String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
+            // Get segment names with increasing sequenceIds
+            String segmentName = getSegmentName(globalCommittable, sequenceId);
+            // Segment committer handling the whole commit process for a single segment
+            Callable<Boolean> segmentCommitter = new SegmentCommitter(
+                    pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
+                    dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
+                    segmentTimeUnit
+            );
+            // Submits the segment committer to the thread pool
+            resultFutures.add(pool.submit(segmentCommitter));
+        }
+
+        boolean commitSucceeded = true;
+        try {
+            for (Future<Boolean> wasSuccessful : resultFutures) {
+                // In case any of the segment commits wasn't successful we mark the whole
+                // globalCommittable as failed
+                if (!wasSuccessful.get()) {
+                    commitSucceeded = false;
+                    failedCommits.add(globalCommittable);
+                    // Once any of the commits failed, we do not need to check the remaining
+                    // ones, as we try to commit the globalCommittable next time
+                    break;
+                }
             }
+        } catch (InterruptedException | ExecutionException e) {
+            // In case of an exception thrown while accessing commit status, mark the whole
+            // globalCommittable as failed
+            failedCommits.add(globalCommittable);
+            LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
+        }
 
-            for (String existingSegment : commitStatus.getExistingSegmentNames()) {
-                // Some but not all segments were already committed. As we cannot assure the data
-                // files containing the same data as originally when recovering from failure,
-                // we delete the already committed segments in order to recommit them later on.
-                pinotControllerClient.deleteSegment(tableName, existingSegment);
+        if (commitSucceeded) {
+            // If commit succeeded, cleanup the data files stored on the shared file system. In
+            // case the commit of at least one of the segments failed, nothing will be cleaned
+            // up here to enable retrying failed commits (data files must therefore stay
+            // available on the shared filesystem).
+            for (String path : globalCommittable.getDataFilePaths()) {
+                fsAdapter.deleteFromSharedFileSystem(path);
             }
-            committablesToRetry.add(globalCommittable);
         }
-        return committablesToRetry;
+
+        if (failedCommits.size() > 0) {
+            LOG.error(failedCommits.toString());
+        }
+
+    }
+
+    /**
+     * Helper method for generating segment names using the segment name generator.
+     *
+     * @param globalCommittable Global committable the segment name shall be generated from
+     * @param sequenceId        Incrementing counter
+     * @return generated segment name
+     */
+    private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
+        return segmentNameGenerator.generateSegmentName(sequenceId,
+                globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
     }
 
     /**
      * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable}
      * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s.
      *
-     * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+     * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter}
      * @return Global committer committable
      */
-    @Override
     public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) {
         List<String> dataFilePaths = new ArrayList<>();
         long minTimestamp = Long.MAX_VALUE;
@@ -167,100 +220,26 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
     }
 
     /**
-     * Copies data files from shared filesystem to the local filesystem, generates segments with names
-     * according to the segment naming schema and finally pushes the segments to the Pinot cluster.
-     * Before pushing a segment it is checked whether there already exists a segment with that name
-     * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted.
+     * Identifies global committables that need to be re-committed from a list of recovered committables.
      *
-     * @param globalCommittables List of global committables
-     * @return Global committables whose commit failed
+     * @param globalCommittable List of global committables that are checked for required re-commit
+     * @return List of global committable that need to be re-committed
      * @throws IOException
      */
-    @Override
-    public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
-        // List of failed global committables that can be retried later on
-        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
-
-        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
-            Set<Future<Boolean>> resultFutures = new HashSet<>();
-            // Commit all segments in globalCommittable
-            for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
-                String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
-                // Get segment names with increasing sequenceIds
-                String segmentName = getSegmentName(globalCommittable, sequenceId);
-                // Segment committer handling the whole commit process for a single segment
-                Callable<Boolean> segmentCommitter = new SegmentCommitter(
-                        pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
-                        dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
-                        segmentTimeUnit
-                );
-                // Submits the segment committer to the thread pool
-                resultFutures.add(pool.submit(segmentCommitter));
-            }
+    private PinotSinkGlobalCommittable filterRecoveredCommittables(PinotSinkGlobalCommittable globalCommittable) throws IOException {
 
-            boolean commitSucceeded = true;
-            try {
-                for (Future<Boolean> wasSuccessful : resultFutures) {
-                    // In case any of the segment commits wasn't successful we mark the whole
-                    // globalCommittable as failed
-                    if (!wasSuccessful.get()) {
-                        commitSucceeded = false;
-                        failedCommits.add(globalCommittable);
-                        // Once any of the commits failed, we do not need to check the remaining
-                        // ones, as we try to commit the globalCommittable next time
-                        break;
-                    }
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                // In case of an exception thrown while accessing commit status, mark the whole
-                // globalCommittable as failed
-                failedCommits.add(globalCommittable);
-                LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
-            }
+        CommitStatus commitStatus = getCommitStatus(globalCommittable);
 
-            if (commitSucceeded) {
-                // If commit succeeded, cleanup the data files stored on the shared file system. In
-                // case the commit of at least one of the segments failed, nothing will be cleaned
-                // up here to enable retrying failed commits (data files must therefore stay
-                // available on the shared filesystem).
-                for (String path : globalCommittable.getDataFilePaths()) {
-                    fsAdapter.deleteFromSharedFileSystem(path);
-                }
+        if (!commitStatus.getMissingSegmentNames().isEmpty()) {
+            for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we cannot assure the data
+                // files containing the same data as originally when recovering from failure,
+                // we delete the already committed segments in order to recommit them later on.
+                pinotControllerClient.deleteSegment(tableName, existingSegment);
             }
         }
 
-        // Return failed commits so that they can be retried later on
-        return failedCommits;
-    }
-
-    /**
-     * Empty method.
-     */
-    @Override
-    public void endOfInput() {
-    }
-
-    /**
-     * Closes the Pinot controller http client, clears the created temporary directory and
-     * shuts the thread pool down.
-     */
-    @Override
-    public void close() throws IOException {
-        pinotControllerClient.close();
-        tempDirectory.delete();
-        pool.shutdown();
-    }
-
-    /**
-     * Helper method for generating segment names using the segment name generator.
-     *
-     * @param globalCommittable Global committable the segment name shall be generated from
-     * @param sequenceId        Incrementing counter
-     * @return generated segment name
-     */
-    private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
-        return segmentNameGenerator.generateSegmentName(sequenceId,
-                globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
+        return globalCommittable;
     }
 
     /**
@@ -290,6 +269,13 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
         return new CommitStatus(existingSegmentNames, missingSegmentNames);
     }
 
+    @Override
+    public void close() throws Exception {
+        pinotControllerClient.close();
+        tempDirectory.delete();
+        pool.shutdown();
+    }
+
     /**
      * Wrapper for existing and missing segments in the Pinot cluster.
      */
@@ -315,7 +301,7 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
      * Helper class for committing a single segment. Downloads a data file from the shared filesystem,
      * generates a segment from the data file and uploads segment to the Pinot controller.
      */
-    private static class SegmentCommitter implements Callable<Boolean> {
+    public static class SegmentCommitter implements Callable<Boolean> {
 
         private static final Logger LOG = LoggerFactory.getLogger(SegmentCommitter.class);
 
@@ -342,11 +328,11 @@ public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommit
          * @param timeColumnName      Name of the column containing the timestamp
          * @param segmentTimeUnit     Unit of the time column
          */
-        SegmentCommitter(String pinotControllerHost, String pinotControllerPort,
-                         File tempDirectory, FileSystemAdapter fsAdapter,
-                         String dataFilePath, String segmentName, Schema tableSchema,
-                         TableConfig tableConfig, String timeColumnName,
-                         TimeUnit segmentTimeUnit) {
+        public SegmentCommitter(String pinotControllerHost, String pinotControllerPort,
+                                File tempDirectory, FileSystemAdapter fsAdapter,
+                                String dataFilePath, String segmentName, Schema tableSchema,
+                                TableConfig tableConfig, String timeColumnName,
+                                TimeUnit segmentTimeUnit) {
             this.pinotControllerHost = pinotControllerHost;
             this.pinotControllerPort = pinotControllerPort;
             this.tempDirectory = tempDirectory;
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/EventTimeExtractor.java
similarity index 51%
copy from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
copy to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/EventTimeExtractor.java
index 0e23e2f..755e14f 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/EventTimeExtractor.java
@@ -16,32 +16,36 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.external;
 
-import java.io.Serializable;
-import java.util.List;
-
-public class PinotSinkWriterState implements Serializable {
-
-    private final List<String> serializedElements;
-    private final long minTimestamp;
-    private final long maxTimestamp;
-
-    public PinotSinkWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
-        this.serializedElements = serializedElements;
-        this.minTimestamp = minTimestamp;
-        this.maxTimestamp = maxTimestamp;
-    }
+import org.apache.flink.api.connector.sink2.SinkWriter;
 
-    public List<String> getSerializedElements() {
-        return serializedElements;
-    }
-
-    public long getMinTimestamp() {
-        return minTimestamp;
-    }
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
 
-    public long getMaxTimestamp() {
-        return maxTimestamp;
-    }
+/**
+ * Defines the interface for event time extractors
+ *
+ * @param <IN> Type of incoming elements
+ */
+public interface EventTimeExtractor<IN> extends Serializable {
+
+    /**
+     * Extracts event time from incoming elements.
+     *
+     * @param element Incoming element
+     * @param context Context of SinkWriter
+     * @return timestamp
+     */
+    long getEventTime(IN element, SinkWriter.Context context);
+
+    /**
+     * @return Name of column in Pinot target table that contains the timestamp.
+     */
+    String getTimeColumn();
+
+    /**
+     * @return Unit of the time column in the Pinot target table.
+     */
+    TimeUnit getSegmentTimeUnit();
 }
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/JsonSerializer.java
similarity index 52%
copy from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
copy to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/JsonSerializer.java
index 0e23e2f..4f366f5 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/external/JsonSerializer.java
@@ -16,32 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.external;
 
 import java.io.Serializable;
-import java.util.List;
 
-public class PinotSinkWriterState implements Serializable {
-
-    private final List<String> serializedElements;
-    private final long minTimestamp;
-    private final long maxTimestamp;
-
-    public PinotSinkWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
-        this.serializedElements = serializedElements;
-        this.minTimestamp = minTimestamp;
-        this.maxTimestamp = maxTimestamp;
-    }
-
-    public List<String> getSerializedElements() {
-        return serializedElements;
-    }
-
-    public long getMinTimestamp() {
-        return minTimestamp;
-    }
+/**
+ * Defines the interface for serializing incoming elements to JSON format.
+ * The JSON format is expected during Pinot segment creation.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public abstract class JsonSerializer<IN> implements Serializable {
 
-    public long getMaxTimestamp() {
-        return maxTimestamp;
-    }
+    public abstract String toJson(IN element);
 }
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriter.java
similarity index 65%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriter.java
index 1a84e02..4e66c1e 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriter.java
@@ -16,35 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
 
 import com.google.common.collect.Iterables;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
-import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommitter;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/**
- * Accepts incoming elements and creates {@link PinotSinkCommittable}s out of them on request.
- *
- * @param <IN> Type of incoming elements
- */
-@Internal
-public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable, PinotSinkWriterState> {
+public class PinotWriter<IN> implements SinkWriter<IN>,
+        StatefulSink.StatefulSinkWriter<IN, PinotWriterState>,
+        TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, PinotSinkCommittable> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkWriter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PinotWriter.class);
 
     private final int maxRowsPerSegment;
     private final EventTimeExtractor<IN> eventTimeExtractor;
@@ -55,6 +54,9 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
 
     private final int subtaskId;
 
+    private final PinotSinkCommitter committer;
+
+
     /**
      * @param subtaskId          Subtask id provided by Flink
      * @param maxRowsPerSegment  Maximum number of rows to be stored within a Pinot segment
@@ -62,88 +64,48 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
      * @param jsonSerializer     Serializer used to convert elements to JSON
      * @param fsAdapter          Filesystem adapter used to save files for sharing files across nodes
      */
-    public PinotSinkWriter(int subtaskId, int maxRowsPerSegment,
-                           EventTimeExtractor<IN> eventTimeExtractor,
-                           JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) {
+    public PinotWriter(int subtaskId, int maxRowsPerSegment,
+                       EventTimeExtractor<IN> eventTimeExtractor,
+                       JsonSerializer<IN> jsonSerializer,
+                       FileSystemAdapter fsAdapter,
+                       Collection<PinotWriterState> states,
+                       PinotSinkCommitter committer) {
         this.subtaskId = subtaskId;
         this.maxRowsPerSegment = maxRowsPerSegment;
         this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
         this.jsonSerializer = checkNotNull(jsonSerializer);
         this.fsAdapter = checkNotNull(fsAdapter);
         this.activeSegments = new ArrayList<>();
+        this.committer = committer;
+
+        if (states.size() == 1) {
+            initializeState(states.iterator().next());
+        } else if (states.size() > 1) {
+            throw new IllegalStateException("Did not expected more than one element in states.");
+        }
     }
 
-    /**
-     * Takes elements from an upstream tasks and writes them into {@link PinotWriterSegment}
-     *
-     * @param element Object from upstream task
-     * @param context SinkWriter context
-     * @throws IOException
-     */
     @Override
-    public void write(IN element, Context context) throws IOException {
+    public void write(IN element, Context context) throws IOException, InterruptedException {
         final PinotWriterSegment<IN> inProgressSegment = getOrCreateInProgressSegment();
         inProgressSegment.write(element, eventTimeExtractor.getEventTime(element, context));
     }
 
-    /**
-     * Creates {@link PinotSinkCommittable}s from elements previously received via {@link #write}.
-     * If flush is set, all {@link PinotWriterSegment}s are transformed into
-     * {@link PinotSinkCommittable}s. If flush is not set, only currently non-active
-     * {@link PinotSinkCommittable}s are transformed into {@link PinotSinkCommittable}s.
-     * To convert a {@link PinotWriterSegment} into a {@link PinotSinkCommittable} the data gets
-     * written to the shared filesystem. Moreover, minimum and maximum timestamps are identified.
-     * Finally, all {@link PinotWriterSegment}s transformed into {@link PinotSinkCommittable}s are
-     * removed from {@link #activeSegments}.
-     *
-     * @param flush Flush all currently known elements into the {@link PinotSinkCommittable}s
-     * @return List of {@link PinotSinkCommittable} to process in {@link org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter}
-     * @throws IOException
-     */
     @Override
-    public List<PinotSinkCommittable> prepareCommit(boolean flush) throws IOException {
-        // Identify segments to commit. If the flush argument is set all segments shall be committed.
-        // Otherwise, take only those PinotWriterSegments that do not accept any more elements.
-        List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream()
-                .filter(s -> flush || !s.acceptsElements())
-                .collect(Collectors.toList());
-        LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId);
-
-        LOG.debug("Creating committables... [subtaskId={}]", subtaskId);
-        List<PinotSinkCommittable> committables = new ArrayList<>();
-        for (final PinotWriterSegment<IN> segment : segmentsToCommit) {
-            committables.add(segment.prepareCommit());
+    public void flush(boolean flush) throws IOException, InterruptedException {
+        if (flush) {
+            Collection<PinotSinkCommittable> committables = prepareCommittables(flush);
+            committer.commitSink(committables);
         }
-        LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId);
-
-        // Remove all PinotWriterSegments that will be emitted within the committables.
-        activeSegments.removeAll(segmentsToCommit);
-        return committables;
     }
 
-    /**
-     * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one.
-     *
-     * @return {@link PinotWriterSegment} accepting at least one more element
-     */
-    private PinotWriterSegment<IN> getOrCreateInProgressSegment() {
-        final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
-        if (latestSegment == null || !latestSegment.acceptsElements()) {
-            final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
-            activeSegments.add(inProgressSegment);
-            return inProgressSegment;
-        }
-        return latestSegment;
+    @Override
+    public Collection<PinotSinkCommittable> prepareCommit() throws IOException, InterruptedException {
+        return prepareCommittables(false);
     }
 
-    /**
-     * Snapshots the latest PinotWriterSegment (if existent), so that the contained (and not yet
-     * committed) elements can be recovered later on in case of a failure.
-     *
-     * @return A list containing at most one PinotSinkWriterState
-     */
     @Override
-    public List<PinotSinkWriterState> snapshotState() {
+    public List<PinotWriterState> snapshotState(long l) throws IOException {
         final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
         if (latestSegment == null || !latestSegment.acceptsElements()) {
             return new ArrayList<>();
@@ -155,10 +117,10 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
     /**
      * Initializes the writer according to a previously taken snapshot.
      *
-     * @param state PinotSinkWriterState extracted from snapshot
+     * @param state PinotWriterState extracted from snapshot
      */
-    public void initializeState(PinotSinkWriterState state) {
-        if (activeSegments.size() != 0) {
+    private void initializeState(PinotWriterState state) {
+        if (!activeSegments.isEmpty()) {
             throw new IllegalStateException("Please call the initialization before creating the first PinotWriterSegment.");
         }
         // Create a new PinotWriterSegment and recover its state from the given PinotSinkWriterState
@@ -167,10 +129,47 @@ public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable,
         activeSegments.add(inProgressSegment);
     }
 
+    @Override
+    public void close() throws Exception {
+
+    }
+
     /**
-     * Empty method, as we do not open any connections.
+     * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one.
+     *
+     * @return {@link PinotWriterSegment} accepting at least one more element
      */
-    @Override
-    public void close() {
+    private PinotWriterSegment<IN> getOrCreateInProgressSegment() {
+        final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
+        if (latestSegment == null || !latestSegment.acceptsElements()) {
+            final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
+            activeSegments.add(inProgressSegment);
+            return inProgressSegment;
+        }
+        return latestSegment;
     }
+
+    private List<PinotSinkCommittable> prepareCommittables(boolean flush) throws IOException {
+        if (activeSegments.isEmpty()) return Collections.emptyList();
+
+        // Identify segments to commit. If the flush argument is set all segments shall be committed.
+        // Otherwise, take only those PinotWriterSegments that do not accept any more elements.
+        List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream()
+                .filter(s -> flush || !s.acceptsElements())
+                .collect(Collectors.toList());
+        LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId);
+
+        LOG.debug("Creating committables... [subtaskId={}]", subtaskId);
+        List<PinotSinkCommittable> committables = new ArrayList<>();
+        for (final PinotWriterSegment<IN> segment : segmentsToCommit) {
+            committables.add(segment.prepareCommit());
+        }
+        LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId);
+
+        // Remove all PinotWriterSegments that will be emitted within the committables.
+        activeSegments.removeAll(segmentsToCommit);
+        return committables;
+    }
+
+
 }
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterSegment.java
similarity index 94%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterSegment.java
index 50be145..80e3157 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterSegment.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.v2.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
 import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
 
 import java.io.IOException;
@@ -143,11 +143,11 @@ public class PinotWriterSegment<IN> implements Serializable {
      *
      * @return List of elements currently stored within the {@link PinotWriterSegment}
      */
-    public PinotSinkWriterState snapshotState() {
+    public PinotWriterState snapshotState() {
         if (!acceptsElements()) {
             throw new IllegalStateException("Snapshots can only be created of in-progress segments.");
         }
 
-        return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+        return new PinotWriterState(serializedElements, minTimestamp, maxTimestamp);
     }
 }
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterState.java
similarity index 85%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterState.java
index 0e23e2f..88e5b54 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterState.java
@@ -16,18 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.writer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
 
 import java.io.Serializable;
 import java.util.List;
 
-public class PinotSinkWriterState implements Serializable {
+public class PinotWriterState implements Serializable {
 
     private final List<String> serializedElements;
     private final long minTimestamp;
     private final long maxTimestamp;
 
-    public PinotSinkWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
+    public PinotWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
         this.serializedElements = serializedElements;
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterStateSerializer.java
similarity index 78%
rename from flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
rename to flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterStateSerializer.java
index 6dc7efa..fe97bc9 100644
--- a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/v2/writer/PinotWriterStateSerializer.java
@@ -16,21 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot.serializer;
+package org.apache.flink.streaming.connectors.pinot.v2.writer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
 
 import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Serializer for {@link PinotSinkWriterState}
+ * Serializer for {@link PinotWriterState}
  */
 @Internal
-public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer<PinotSinkWriterState> {
+public class PinotWriterStateSerializer implements SimpleVersionedSerializer<PinotWriterState> {
 
     private static final int CURRENT_VERSION = 1;
 
@@ -40,7 +39,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
     }
 
     @Override
-    public byte[] serialize(PinotSinkWriterState writerState) throws IOException {
+    public byte[] serialize(PinotWriterState writerState) throws IOException {
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
              DataOutputStream out = new DataOutputStream(baos)) {
             out.writeLong(writerState.getMinTimestamp());
@@ -57,7 +56,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
     }
 
     @Override
-    public PinotSinkWriterState deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+    public PinotWriterState deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
         switch (version) {
             case 1:
                 return deserializeV1(serialized);
@@ -66,7 +65,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
         }
     }
 
-    private PinotSinkWriterState deserializeV1(byte[] serialized) throws IOException {
+    private PinotWriterState deserializeV1(byte[] serialized) throws IOException {
         try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
              DataInputStream in = new DataInputStream(bais)) {
             long minTimestamp = in.readLong();
@@ -77,7 +76,7 @@ public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer
             for (int i = 0; i < size; i++) {
                 serializedElements.add(in.readUTF());
             }
-            return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+            return new PinotWriterState(serializedElements, minTimestamp, maxTimestamp);
         }
     }
 }
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotClusterContainer.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotClusterContainer.java
new file mode 100644
index 0000000..448c064
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotClusterContainer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.net.HostAndPort;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import static java.lang.String.format;
+import static org.testcontainers.utility.DockerImageName.parse;
+
+@Testcontainers
+public class PinotClusterContainer implements Closeable {
+
+    private static final String PINOT_IMAGE_NAME = "apachepinot/pinot:0.6.0";
+    private static final String ZOOKEEPER_IMAGE_NAME = "zookeeper:latest";
+
+    private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper";
+
+    private static final int ZOOKEEPER_PORT = 2181;
+    private static final int CONTROLLER_PORT = 9000;
+    private static final int BROKER_PORT = 8099;
+    private static final int SERVER_ADMIN_PORT = 8097;
+    private static final int SERVER_PORT = 8098;
+    private static final int GRPC_PORT = 8090;
+
+    @Container
+    private final GenericContainer<?> controller;
+    @Container
+    private final GenericContainer<?> broker;
+    @Container
+    private final GenericContainer<?> server;
+    @Container
+    private final GenericContainer<?> zookeeper;
+
+    public PinotClusterContainer() {
+        Network network = Network.newNetwork();
+
+        zookeeper = new GenericContainer<>(parse(ZOOKEEPER_IMAGE_NAME))
+                .withStartupAttempts(3)
+                .withNetwork(network)
+                .withNetworkAliases(ZOOKEEPER_INTERNAL_HOST)
+                .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT))
+                .withExposedPorts(ZOOKEEPER_PORT);
+
+        controller = new GenericContainer<>(parse(PINOT_IMAGE_NAME))
+                .withStartupAttempts(3)
+                .withNetwork(network)
+                .withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY)
+                .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
+                .withCommand("StartController", "-configFileName", "/var/pinot/controller/config/pinot-controller.conf")
+                .withNetworkAliases("pinot-controller", "localhost")
+                .withExposedPorts(CONTROLLER_PORT);
+
+        broker = new GenericContainer<>(parse(PINOT_IMAGE_NAME))
+                .withStartupAttempts(3)
+                .withNetwork(network)
+                .withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY)
+                .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
+                .withCommand("StartBroker", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/broker/config/pinot-broker.conf")
+                .withNetworkAliases("pinot-broker", "localhost")
+                .withExposedPorts(BROKER_PORT);
+
+        server = new GenericContainer<>(parse(PINOT_IMAGE_NAME))
+                .withStartupAttempts(3)
+                .withNetwork(network)
+                .withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY)
+                .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
+                .withCommand("StartServer", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf")
+                .withNetworkAliases("pinot-server", "localhost")
+                .withExposedPorts(SERVER_PORT, SERVER_ADMIN_PORT, GRPC_PORT);
+    }
+
+    public void start() {
+        zookeeper.start();
+        controller.start();
+        broker.start();
+        server.start();
+    }
+
+    public void stop() {
+        server.stop();
+        broker.stop();
+        controller.stop();
+        zookeeper.stop();
+    }
+
+    private String getZookeeperInternalHostPort() {
+        return format("%s:%s", ZOOKEEPER_INTERNAL_HOST, ZOOKEEPER_PORT);
+    }
+
+    public HostAndPort getControllerHostAndPort() {
+        return HostAndPort.fromParts(controller.getHost(), controller.getMappedPort(CONTROLLER_PORT));
+    }
+
+    public HostAndPort getBrokerHostAndPort() {
+        return HostAndPort.fromParts(broker.getHost(), broker.getMappedPort(BROKER_PORT));
+    }
+
+    public HostAndPort getServerHostAndPort() {
+        return HostAndPort.fromParts(server.getHost(), server.getMappedPort(SERVER_PORT));
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        stop();
+    }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
index 73a4403..aeec3dc 100644
--- a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
@@ -156,7 +156,11 @@ public class PinotTestHelper implements Closeable {
             return pinotResultSetGroup.getResultSet(0);
         } finally {
             if (brokerConnection != null) {
-                brokerConnection.close();
+                try {
+                    brokerConnection.close();
+                } catch (Exception e) {
+                    LOG.warn(e.getLocalizedMessage());
+                }
             }
         }
     }
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkTest.java
similarity index 90%
rename from flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
rename to flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkTest.java
index 8649eeb..9ae5a76 100644
--- a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotSinkTest.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,27 +16,27 @@
  * limitations under the License.
  */
 
-
-package org.apache.flink.streaming.connectors.pinot;
+package org.apache.flink.streaming.connectors.pinot.v2;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.pinot.LocalFileSystemAdapter;
 import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
-import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
 import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
 import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
 import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.v2.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
 import org.apache.pinot.client.PinotClientException;
 import org.apache.pinot.client.ResultSet;
 import org.junit.jupiter.api.Assertions;
@@ -54,10 +55,7 @@ import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/**
- * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
- */
-public class PinotSinkTest extends PinotTestBase {
+class PinotSinkTest extends PinotTestBase {
 
     private static final int MAX_ROWS_PER_SEGMENT = 5;
     private static final long STREAMING_CHECKPOINTING_INTERVAL = 50;
@@ -157,7 +155,7 @@ public class PinotSinkTest extends PinotTestBase {
     public void testFailureRecoveryInStreamingSink() throws Exception {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        env.setParallelism(1);
+        env.setParallelism(2);
         env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
 
         List<String> rawData = getRawTestData(20);
@@ -193,7 +191,7 @@ public class PinotSinkTest extends PinotTestBase {
      * @return resulting data stream
      */
     private DataStream<SingleColumnTableRow> setupStreamingDataSource(StreamExecutionEnvironment env, List<String> rawDataValues) {
-        StreamingSource source = new StreamingSource.Builder(rawDataValues, 10).build();
+        PinotSinkTest.StreamingSource source = new PinotSinkTest.StreamingSource.Builder(rawDataValues, 10).build();
         return env.addSource(source)
                 .name("Test input");
     }
@@ -207,7 +205,7 @@ public class PinotSinkTest extends PinotTestBase {
      * @return resulting data stream
      */
     private DataStream<SingleColumnTableRow> setupFailingStreamingDataSource(StreamExecutionEnvironment env, List<String> rawDataValues, int failOnceAtNthElement) {
-        StreamingSource source = new StreamingSource.Builder(rawDataValues, 10)
+        PinotSinkTest.StreamingSource source = new PinotSinkTest.StreamingSource.Builder(rawDataValues, 10)
                 .raiseFailureOnce(failOnceAtNthElement)
                 .build();
         return env.addSource(source)
@@ -257,9 +255,9 @@ public class PinotSinkTest extends PinotTestBase {
         FileSystemAdapter fsAdapter = new LocalFileSystemAdapter(tempDirPrefix);
         JsonSerializer<SingleColumnTableRow> jsonSerializer = new SingleColumnTableRowSerializer();
 
-        EventTimeExtractor<SingleColumnTableRow> eventTimeExtractor = new SingleColumnTableRowEventTimeExtractor();
+        EventTimeExtractor<SingleColumnTableRow> eventTimeExtractor = new PinotSinkTest.SingleColumnTableRowEventTimeExtractor();
 
-        PinotSink<SingleColumnTableRow> sink = new PinotSink.Builder<SingleColumnTableRow>(getPinotHost(), getPinotControllerPort(), getTableName())
+        PinotSink<SingleColumnTableRow> sink = new PinotSinkBuilder<SingleColumnTableRow>(getPinotHost(), getPinotControllerPort(), getTableName())
                 .withMaxRowsPerSegment(MAX_ROWS_PER_SEGMENT)
                 .withTempDirectoryPrefix(tempDirPrefix)
                 .withJsonSerializer(jsonSerializer)
@@ -405,9 +403,9 @@ public class PinotSinkTest extends PinotTestBase {
          * to Pinot by then, as we require {@link #failOnceAtNthElement} to be greater than
          * {@link #MAX_ROWS_PER_SEGMENT} (at a parallelism of 1). This allows to check whether the
          * snapshot creation and failure recovery in
-         * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} works properly,
+         * {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriter} works properly,
          * respecting the already committed elements and those that are stored in an active
-         * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment}. Committed
+         * {@link org.apache.flink.streaming.connectors.pinot.v2.writer.PinotWriterSegment}. Committed
          * elements must not be saved to the snapshot while those in an active segment must be saved
          * to the snapshot in order to enable later-on recovery.
          *
@@ -460,15 +458,15 @@ public class PinotSinkTest extends PinotTestBase {
                 this.sleepDurationMs = sleepDurationMs;
             }
 
-            public Builder raiseFailureOnce(int failOnceAtNthElement) {
+            public PinotSinkTest.StreamingSource.Builder raiseFailureOnce(int failOnceAtNthElement) {
                 checkArgument(failOnceAtNthElement > MAX_ROWS_PER_SEGMENT,
                         "failOnceAtNthElement (if set) is required to be larger than the number of elements per segment (MAX_ROWS_PER_SEGMENT).");
                 this.failOnceAtNthElement = failOnceAtNthElement;
                 return this;
             }
 
-            public StreamingSource build() {
-                return new StreamingSource(rawDataValues, sleepDurationMs, failOnceAtNthElement);
+            public PinotSinkTest.StreamingSource build() {
+                return new PinotSinkTest.StreamingSource(rawDataValues, sleepDurationMs, failOnceAtNthElement);
             }
         }
     }
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotTestBase.java
similarity index 73%
rename from flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
rename to flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotTestBase.java
index a5f5021..d9d7bf0 100644
--- a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/v2/PinotTestBase.java
@@ -16,11 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.pinot;
+package org.apache.flink.streaming.connectors.pinot.v2;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.PinotClusterContainer;
+import org.apache.flink.streaming.connectors.pinot.PinotTestHelper;
+import org.apache.flink.streaming.connectors.pinot.v2.external.JsonSerializer;
 import org.apache.flink.util.TestLogger;
 import org.apache.pinot.spi.config.table.*;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -28,61 +29,29 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
-import java.time.Duration;
 
 /**
  * Base class for PinotSink e2e tests
  */
-@Testcontainers
 public class PinotTestBase extends TestLogger {
 
     protected static final Logger LOG = LoggerFactory.getLogger(PinotTestBase.class);
 
-    private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0";
-    private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000;
-    private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000;
-
+    private static final PinotClusterContainer pinotCluster = new PinotClusterContainer();
     protected static TableConfig TABLE_CONFIG;
     protected static final Schema TABLE_SCHEMA = PinotTableConfig.getTableSchema();
     protected static PinotTestHelper pinotHelper;
 
-    /**
-     * Creates the Pinot testcontainer. We delay the start of tests until Pinot has started all
-     * internal components. This is identified through a log statement.
-     */
-    @Container
-    public static GenericContainer<?> pinot = new GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME))
-            .withCommand("QuickStart", "-type", "batch")
-            .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, PINOT_INTERNAL_CONTROLLER_PORT)
-            .waitingFor(
-                    // Wait for controller, server and broker instances to be available
-                    new HttpWaitStrategy()
-                            .forPort(PINOT_INTERNAL_CONTROLLER_PORT)
-                            .forPath("/instances")
-                            .forStatusCode(200)
-                            .forResponsePredicate(res -> {
-                                try {
-                                    JsonNode instances = JsonUtils.stringToJsonNode(res).get("instances");
-                                    // Expect 3 instances to be up and running (controller, broker and server)
-                                    return instances.size() == 3;
-                                } catch (IOException e) {
-                                    LOG.error("Error while reading json response in HttpWaitStrategy.", e);
-                                }
-                                return false;
-                            })
-                            // Allow Pinot to take up to 180s for starting up
-                            .withStartupTimeout(Duration.ofSeconds(180))
-            );
+    @BeforeAll
+    public static void setUpAll() {
+        pinotCluster.start();
+    }
 
     /**
      * Creates a new instance of the {@link PinotTestHelper} using the testcontainer port mappings
@@ -122,7 +91,7 @@ public class PinotTestBase extends TestLogger {
      * @return Pinot container host
      */
     protected String getPinotHost() {
-        return pinot.getHost();
+        return pinotCluster.getControllerHostAndPort().getHostText();
     }
 
 
@@ -132,7 +101,7 @@ public class PinotTestBase extends TestLogger {
      * @return Pinot controller port
      */
     protected String getPinotControllerPort() {
-        return pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString();
+        return String.valueOf(pinotCluster.getControllerHostAndPort().getPort());
     }
 
     /**
@@ -141,7 +110,7 @@ public class PinotTestBase extends TestLogger {
      * @return Pinot broker port
      */
     private String getPinotBrokerPort() {
-        return pinot.getMappedPort(PINOT_INTERNAL_BROKER_PORT).toString();
+        return String.valueOf(pinotCluster.getBrokerHostAndPort().getPort());
     }
 
     /**
@@ -175,6 +144,14 @@ public class PinotTestBase extends TestLogger {
         public void setTimestamp(Long timestamp) {
             this._timestamp = timestamp;
         }
+
+        @Override
+        public String toString() {
+            return "SingleColumnTableRow{" +
+                    "_col1='" + _col1 + '\'' +
+                    ", _timestamp=" + _timestamp +
+                    '}';
+        }
     }
 
     /**
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/log4j.properties
index 570d6f6..82631a4 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/log4j.properties
@@ -25,3 +25,16 @@ log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.out
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+
+status=warn
+appender.console.type=Console
+appender.console.name=LogToConsole
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} - %highlight{%5p} %style{%logger{36}}{cyan} - %m%n%throwable
+
+### Logger Apache Flink ###
+logger.apacheFlink.name=org.apache.flink.streaming.connectors.pinot
+logger.apacheFlink.level=debug
+logger.apacheFlink.additivity=false
+logger.apacheFlink.appenderRef.console.ref=LogToConsole
diff --git a/flink-connector-pinot/src/test/resources/log4j2-test.properties b/flink-connector-pinot/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..330e112
--- /dev/null
+++ b/flink-connector-pinot/src/test/resources/log4j2-test.properties
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+status=WARN
+appender.console.type=Console
+appender.console.name=LogToConsole
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} - %highlight{%5p} %style{%logger{36}}{cyan} - %m%n%throwable
+### Logger test containers ###
+logger.testContainers.name=org.testcontainers
+logger.testContainers.level=WARN
+logger.testContainers.additivity=false
+logger.testContainers.appenderRef.console.ref=LogToConsole
+### Logger Docker Java ###
+logger.dockerJava.name=com.github.dockerjava
+logger.dockerJava.level=WARN
+logger.dockerJava.additivity=false
+logger.dockerJava.appenderRef.console.ref=LogToConsole
+### Logger Apache Flink ###
+logger.apacheFlink.name=org.apache.flink
+logger.apacheFlink.level=WARN
+logger.apacheFlink.additivity=false
+logger.apacheFlink.appenderRef.console.ref=LogToConsole
+### Logger Apache Streaming Connectors ###
+logger.streamingConnectors.name=org.apache.flink.streaming.connectors
+logger.streamingConnectors.level=WARN
+logger.streamingConnectors.additivity=false
+logger.streamingConnectors.appenderRef.console.ref=LogToConsole
+# Root Logger
+rootLogger.level=OFF
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/pinot-broker/pinot-broker.conf
similarity index 71%
copy from flink-connector-pinot/src/test/resources/log4j.properties
copy to flink-connector-pinot/src/test/resources/pinot-broker/pinot-broker.conf
index 570d6f6..3cd74a2 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/pinot-broker/pinot-broker.conf
@@ -16,12 +16,6 @@
 # limitations under the License.
 ################################################################################
 
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+pinot.broker.client.queryPort=8099
+pinot.broker.routing.table.builder.class=random
+pinot.set.instance.id.to.hostname=true
\ No newline at end of file
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/pinot-controller/pinot-controller.conf
similarity index 71%
copy from flink-connector-pinot/src/test/resources/log4j.properties
copy to flink-connector-pinot/src/test/resources/pinot-controller/pinot-controller.conf
index 570d6f6..d075e99 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/pinot-controller/pinot-controller.conf
@@ -16,12 +16,10 @@
 # limitations under the License.
 ################################################################################
 
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+controller.helix.cluster.name=pinot
+controller.host=pinot-controller
+controller.port=9000
+controller.data.dir=/var/pinot/controller/data/data
+controller.local.temp.dir=/var/pinot/controller/data
+controller.zk.str=zookeeper:2181
+pinot.set.instance.id.to.hostname=true
\ No newline at end of file
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/pinot-server/pinot-server.conf
similarity index 71%
copy from flink-connector-pinot/src/test/resources/log4j.properties
copy to flink-connector-pinot/src/test/resources/pinot-server/pinot-server.conf
index 570d6f6..219427a 100644
--- a/flink-connector-pinot/src/test/resources/log4j.properties
+++ b/flink-connector-pinot/src/test/resources/pinot-server/pinot-server.conf
@@ -16,12 +16,9 @@
 # limitations under the License.
 ################################################################################
 
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+pinot.server.netty.port=8098
+pinot.server.adminapi.port=8097
+pinot.server.instance.dataDir=/var/pinot/server/data/index
+pinot.server.instance.segmentTarDir=/var/pinot/server/data/segment
+pinot.set.instance.id.to.hostname=true
+pinot.server.grpc.enable=true
\ No newline at end of file
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 9b04ac8..374c288 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -60,11 +60,11 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
             <type>test-jar</type>
         </dependency>
         <dependency>
@@ -73,7 +73,7 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -85,7 +85,7 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <artifactId>flink-test-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>redis.clients</groupId>
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
index 2bbbcb7..ea5a722 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -27,7 +27,7 @@ import static org.apache.flink.util.NetUtils.getAvailablePort;
 
 public abstract class RedisITCaseBase extends AbstractTestBase {
 
-    public static final int REDIS_PORT = getAvailablePort();
+    public static final int REDIS_PORT = getAvailablePort().getPort();
     public static final String REDIS_HOST = "127.0.0.1";
 
     private static RedisServer redisServer;
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
index 33dca44..3a7c86e 100644
--- a/flink-library-siddhi/pom.xml
+++ b/flink-library-siddhi/pom.xml
@@ -91,11 +91,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <artifactId>flink-test-utils</artifactId>
         </dependency>
     </dependencies>
 
diff --git a/pom.xml b/pom.xml
index 5d65561..8bfd3ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
         <maven.compiler.target>${java.version}</maven.compiler.target>
 
         <!-- Flink version -->
-        <flink.version>1.14.6</flink.version>
+        <flink.version>1.15.3</flink.version>
         <scala.binary.version>2.12</scala.binary.version>
         <scala.version>2.12.8</scala.version>
 
@@ -123,7 +123,7 @@
         <mockito.version>1.10.19</mockito.version>
         <scalacheck.version>1.14.0</scalacheck.version>
         <scalatest.version>3.0.5</scalatest.version>
-        <testcontainers.version>1.16.2</testcontainers.version>
+        <testcontainers.version>1.17.6</testcontainers.version>
 
         <!-- JvmArg execution -->
         <PermGen>64m</PermGen>
@@ -139,7 +139,7 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
-                <artifactId>flink-clients_${scala.binary.version}</artifactId>
+                <artifactId>flink-clients</artifactId>
                 <version>${flink.version}</version>
                 <scope>test</scope>
             </dependency>
@@ -177,12 +177,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
-                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                <artifactId>flink-streaming-java</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
-                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                <artifactId>flink-streaming-java</artifactId>
                 <version>${flink.version}</version>
                 <type>test-jar</type>
                 <scope>test</scope>
@@ -199,7 +199,7 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
-                <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+                <artifactId>flink-table-api-java-bridge</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
@@ -216,7 +216,7 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
-                <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                <artifactId>flink-test-utils</artifactId>
                 <version>${flink.version}</version>
                 <scope>test</scope>
             </dependency>