You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2021/04/07 11:19:15 UTC

[bahir-flink] branch master updated: [BAHIR-269] Apache Pinot Connector Sink (#113)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bd63cb1  [BAHIR-269] Apache Pinot Connector Sink (#113)
bd63cb1 is described below

commit bd63cb1576a0fef3b973cccf4896f123ea1dc3e1
Author: mschroederi <me...@matspoerschke.de>
AuthorDate: Wed Apr 7 13:19:06 2021 +0200

    [BAHIR-269] Apache Pinot Connector Sink (#113)
---
 distribution/pom.xml                               |   5 +
 flink-connector-pinot/README.md                    | 122 ++++++
 .../images/PinotSinkGlobalCommitter_combine.png    | Bin 0 -> 306316 bytes
 .../docs/images/PinotSinkWriter.png                | Bin 0 -> 197924 bytes
 .../docs/images/PinotSinkWriter_prepareCommit.png  | Bin 0 -> 253074 bytes
 flink-connector-pinot/pom.xml                      | 196 +++++++++
 .../connectors/pinot/PinotControllerClient.java    | 142 ++++++
 .../pinot/PinotControllerHttpClient.java           | 134 ++++++
 .../streaming/connectors/pinot/PinotSink.java      | 376 ++++++++++++++++
 .../pinot/committer/PinotSinkCommittable.java      |  59 +++
 .../committer/PinotSinkGlobalCommittable.java      |  59 +++
 .../pinot/committer/PinotSinkGlobalCommitter.java  | 470 ++++++++++++++++++++
 .../exceptions/PinotControllerApiException.java    |  34 ++
 .../pinot/external/EventTimeExtractor.java         |  51 +++
 .../connectors/pinot/external/JsonSerializer.java  |  32 ++
 .../pinot/filesystem/FileSystemAdapter.java        |  55 +++
 .../pinot/filesystem/FileSystemUtils.java          |  64 +++
 .../name/PinotSinkSegmentNameGenerator.java        |  30 ++
 .../segment/name/SimpleSegmentNameGenerator.java   |  62 +++
 .../serializer/PinotSinkCommittableSerializer.java |  71 +++
 .../PinotSinkGlobalCommittableSerializer.java      |  83 ++++
 .../serializer/PinotSinkWriterStateSerializer.java |  83 ++++
 .../connectors/pinot/writer/PinotSinkWriter.java   | 176 ++++++++
 .../pinot/writer/PinotSinkWriterState.java         |  47 ++
 .../pinot/writer/PinotWriterSegment.java           | 153 +++++++
 .../connectors/pinot/LocalFileSystemAdapter.java   |  79 ++++
 .../streaming/connectors/pinot/PinotSinkTest.java  | 475 +++++++++++++++++++++
 .../streaming/connectors/pinot/PinotTestBase.java  | 251 +++++++++++
 .../connectors/pinot/PinotTestHelper.java          | 168 ++++++++
 .../src/test/resources/log4j.properties            |  27 ++
 pom.xml                                            |   1 +
 31 files changed, 3505 insertions(+)

diff --git a/distribution/pom.xml b/distribution/pom.xml
index b4834b1..89d21d5 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -54,6 +54,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.bahir</groupId>
+            <artifactId>flink-connector-pinot_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bahir</groupId>
             <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/flink-connector-pinot/README.md b/flink-connector-pinot/README.md
new file mode 100644
index 0000000..2044e00
--- /dev/null
+++ b/flink-connector-pinot/README.md
@@ -0,0 +1,122 @@
+# Flink Pinot Connector
+
+This connector provides a sink to [Apache Pinot](http://pinot.apache.org/)™.  
+To use this connector, add the following dependency to your project:
+
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>flink-connector-pinot_2.11</artifactId>
+      <version>1.1-SNAPSHOT</version>
+    </dependency>
+
+*Version Compatibility*: This module is compatible with Pinot 0.6.0.
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html).
+
+The sink class is called `PinotSink`.
+
+## Architecture
+The Pinot sink stores elements from upstream Flink tasks in an Apache Pinot table.
+We support two execution modes
+* `RuntimeExecutionMode.BATCH`
+* `RuntimeExecutionMode.STREAMING` which requires checkpointing to be enabled.
+
+### PinotSinkWriter
+Whenever the sink receives elements from upstream tasks, they are received by an instance of the PinotSinkWriter.
+The `PinotSinkWriter` holds a list of `PinotWriterSegment`s where each `PinotWriterSegment` is capable of storing `maxRowsPerSegment` elements.
+Whenever the maximum number of elements to hold is not yet reached the `PinotWriterSegment` is considered to be active.
+Once the maximum number of elements to hold was reached, an active `PinotWriterSegment` gets inactivated and a new empty `PinotWriterSegment` is created.
+
+<img height="225" alt="PinotSinkWriter" src="docs/images/PinotSinkWriter.png">
+
+Thus, there is always one active `PinotWriterSegment` that new incoming elements will go to.
+Over time, the list of `PinotWriterSegment` per `PinotSinkWriter` increases up to the point where a checkpoint is created.
+
+**Checkpointing**  
+On checkpoint creation `PinotSinkWriter.prepareCommit` gets called by the Flink environment.
+This triggers the creation of `PinotSinkCommittable`s where each inactive `PinotWriterSegment` creates exactly one `PinotSinkCommittable`.
+
+<img height="250" alt="PinotSinkWriter prepareCommit" src="docs/images/PinotSinkWriter_prepareCommit.png">
+
+In order to create a `PinotSinkCommittable`, a file containing a `PinotWriterSegment`'s elements is on the shared filesystem defined via `FileSystemAdapter`.
+The file contains a list of elements in JSON format. The serialization is done via `JSONSerializer`.
+A `PinotSinkCommittables` then holds the path to the data file on the shared filesystem as well as the minimum and maximum timestamp of all contained elements (extracted via `EventTimeExtractor`).
+
+
+### PinotSinkGlobalCommitter
+In order to be able to follow the guidelines for Pinot segment naming, we need to include the minimum and maximum timestamp of an element in the metadata of a segment and in its name.
+The minimum and maximum timestamp of all elements between two checkpoints is determined at a parallelism of 1 in the `PinotSinkGlobalCommitter`.
+This procedure allows recovering from failure by deleting previously uploaded segments which prevents having duplicate segments in the Pinot table.
+
+<img height="250" alt="PinotSinkGlobalCommitter combine" src="docs/images/PinotSinkGlobalCommitter_combine.png">
+
+After all `PinotSinkWriter` subtasks emitted their `PinotSinkCommittable`s, they are sent to the `PinotSinkGlobalCommitter` which first combines all collected `PinotSinkCommittable`s into a single `PinotSinkGlobalCommittable`.
+Therefore, the minimum and maximum timestamps of all collected `PinotSinkCommittable`s is determined. 
+Moreover, the `PinotSinkGlobalCommittable` holds references to all data files from the `PinotSinkCommittable`s.
+
+When finally committing a `PinotSinkGlobalCommittable` the following procedure is executed:
+* Read all data files from the shared filesystem (using `FileSystemAdapter`).
+* Generate Pinot segment names using `PinotSinkSegmentNameGenerator`.
+* Create Pinot segments with minimum and maximum timestamps (stored in `PinotSinkGlobalCommittable`) and previously generated segment assigned.
+* Upload Pinot segments to the Pinot controller
+
+
+## Delivery Guarantees
+Resulting from the above described architecture the `PinotSink` provides an at-least-once delivery guarantee.
+While the failure recovery mechanism ensures that duplicate segments are prevented, there might be temporary inconsistencies in the Pinot table which can result in downstream tasks receiving an element multiple times.
+
+## Options
+| Option                 | Description                                                                      |
+| ---------------------- | -------------------------------------------------------------------------------- | 
+| `pinotControllerHost`  | Host of the Pinot controller                                                     |
+| `pinotControllerPort`  | Port of the Pinot controller                                                     |
+| `tableName`            | Target Pinot table's name                                                        |
+| `maxRowsPerSegment`    | Maximum number of rows to be stored within a Pinot segment                       |
+| `tempDirPrefix`         | Prefix for temp directories used                                                  |
+| `jsonSerializer`       | Serializer used to convert elements to JSON                                      |
+| `eventTimeExtractor`   | Defines the way event times are extracted from received objects                   |
+| `segmentNameGenerator` | Pinot segment name generator                                                     |
+| `fsAdapter`            | Filesystem adapter used to save files for sharing files across nodes               |
+| `numCommitThreads`     | Number of threads used in the `PinotSinkGlobalCommitter` for committing segments |
+
+## Usage
+```java
+StreamExecutionEnvironment env = ...
+// Checkpointing needs to be enabled when executing in STREAMING mode
+        env.enableCheckpointing(long interval);
+
+        DataStream<PinotRow> dataStream = ...
+        PinotSink pinotSink = new PinotSink.Builder<PinotRow>(String pinotControllerHost, String pinotControllerPort, String tableName)
+
+        // Serializes a PinotRow to JSON format
+        .withJsonSerializer(JsonSerializer<PinotRow> jsonSerializer)
+
+        // Extracts the timestamp from a PinotRow
+        .withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor)
+
+        // Defines the segment name generation via the predefined SimpleSegmentNameGenerator
+        // Exemplary segment name: tableName_minTimestamp_maxTimestamp_segmentNamePostfix_0
+        .withSimpleSegmentNameGenerator(String tableName, String segmentNamePostfix)
+
+        // Use a custom segment name generator if the SimpleSegmentNameGenerator does not work for your use case
+        .withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator)
+
+        // Use a custom filesystem adapter. 
+        // CAUTION: Make sure all nodes your Flink app runs on can access the shared filesystem via the provided FileSystemAdapter
+        .withFileSystemAdapter(FileSystemAdapter fsAdapter)
+
+        // Defines the size of the Pinot segments
+        .withMaxRowsPerSegment(int maxRowsPerSegment)
+
+        // Prefix within the local filesystem's temp directory used for storing intermediate files
+        .withTempDirectoryPrefix(String tempDirPrefix)
+        
+        // Number of threads used in the `PinotSinkGlobalCommitter` to commit a batch of segments
+        // Optional - Default is 4
+        .withNumCommitThreads(int numCommitThreads)
+
+        // Builds the PinotSink
+        .build()
+        dataStream.addSink(pinotSink);
+```
diff --git a/flink-connector-pinot/docs/images/PinotSinkGlobalCommitter_combine.png b/flink-connector-pinot/docs/images/PinotSinkGlobalCommitter_combine.png
new file mode 100644
index 0000000..e9ea878
Binary files /dev/null and b/flink-connector-pinot/docs/images/PinotSinkGlobalCommitter_combine.png differ
diff --git a/flink-connector-pinot/docs/images/PinotSinkWriter.png b/flink-connector-pinot/docs/images/PinotSinkWriter.png
new file mode 100644
index 0000000..389988b
Binary files /dev/null and b/flink-connector-pinot/docs/images/PinotSinkWriter.png differ
diff --git a/flink-connector-pinot/docs/images/PinotSinkWriter_prepareCommit.png b/flink-connector-pinot/docs/images/PinotSinkWriter_prepareCommit.png
new file mode 100644
index 0000000..ed4a095
Binary files /dev/null and b/flink-connector-pinot/docs/images/PinotSinkWriter_prepareCommit.png differ
diff --git a/flink-connector-pinot/pom.xml b/flink-connector-pinot/pom.xml
new file mode 100644
index 0000000..74eacd5
--- /dev/null
+++ b/flink-connector-pinot/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-pinot_2.11</artifactId>
+    <name>flink-connector-pinot</name>
+
+
+    <packaging>jar</packaging>
+
+    <!-- Allow users to pass custom connector versions -->
+    <properties>
+        <pinot.version>0.6.0</pinot.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- This is used to interact with Pinot Controller -->
+        <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-tools</artifactId>
+            <version>${pinot.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpmime</artifactId>
+            <version>4.5.13</version>
+        </dependency>
+
+        <!-- This is used to interact with Pinot Broker -->
+        <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-java-client</artifactId>
+            <version>${pinot.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.15.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>1.15.2</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.4.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce-versions</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <rules>
+                                <requireJavaVersion>
+                                    <version>${java.version}</version>
+                                </requireJavaVersion>
+                                <bannedDependencies>
+                                    <excludes combine.self="override">
+                                        <!-- The org.codehaus.groovy dependency is required by org.apache.pinot:pinot-tools -->
+                                    </excludes>
+                                    <searchTransitive>true</searchTransitive>
+                                </bannedDependencies>
+                            </rules>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.1.1</version>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.1.0</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>reference.conf</resource>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerClient.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerClient.java
new file mode 100644
index 0000000..43aafab
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerClient.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+@Internal
+public class PinotControllerClient implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotControllerClient.class);
+    private final PinotControllerHttpClient httpClient;
+
+    /**
+     * @param controllerHost Pinot controller's host
+     * @param controllerPort Pinot controller's port
+     */
+    public PinotControllerClient(String controllerHost, String controllerPort) {
+        this.httpClient = new PinotControllerHttpClient(controllerHost, controllerPort);
+    }
+
+    /**
+     * Checks whether the provided segment name is registered with the given table.
+     *
+     * @param tableName   Target table's name
+     * @param segmentName Segment name to check
+     * @return True if segment with the provided name exists
+     * @throws IOException
+     */
+    public boolean tableHasSegment(String tableName, String segmentName) throws IOException {
+        PinotControllerHttpClient.ApiResponse res = httpClient.get(String.format("/tables/%s/%s/metadata", tableName, segmentName));
+
+        if (res.statusLine.getStatusCode() == 200) {
+            // A segment named `segmentName` exists within the table named `tableName`
+            return true;
+        }
+        if (res.statusLine.getStatusCode() == 404) {
+            // There is no such segment named `segmentName` within the table named `tableName`
+            // (or the table named `tableName` does not exist)
+            return false;
+        }
+
+        // Received an unexpected status code
+        throw new PinotControllerApiException(res.responseBody);
+    }
+
+    /**
+     * Deletes a segment from a table.
+     *
+     * @param tableName   Target table's name
+     * @param segmentName Identifies the segment to delete
+     * @throws IOException
+     */
+    public void deleteSegment(String tableName, String segmentName) throws IOException {
+        PinotControllerHttpClient.ApiResponse res = httpClient.delete(String.format("/tables/%s/%s", tableName, segmentName));
+
+        if (res.statusLine.getStatusCode() != 200) {
+            LOG.error("Could not delete segment {} from table {}. Pinot controller returned: {}", tableName, segmentName, res.responseBody);
+            throw new PinotControllerApiException(res.responseBody);
+        }
+    }
+
+    /**
+     * Fetches a Pinot table's schema via the Pinot controller API.
+     *
+     * @param tableName Target table's name
+     * @return Pinot table schema
+     * @throws IOException
+     */
+    public Schema getSchema(String tableName) throws IOException {
+        Schema schema;
+        PinotControllerHttpClient.ApiResponse res = httpClient.get(String.format("/tables/%s/schema", tableName));
+        LOG.debug("Get schema request for table {} returned {}", tableName, res.responseBody);
+
+        if (res.statusLine.getStatusCode() != 200) {
+            throw new PinotControllerApiException(res.responseBody);
+        }
+
+        try {
+            schema = JsonUtils.stringToObject(res.responseBody, Schema.class);
+        } catch (IOException e) {
+            throw new PinotControllerApiException("Caught exception while reading schema from Pinot Controller's response: " + res.responseBody);
+        }
+        LOG.debug("Retrieved schema: {}", schema.toSingleLineJsonString());
+        return schema;
+    }
+
+    /**
+     * Fetches a Pinot table's configuration via the Pinot controller API.
+     *
+     * @param tableName Target table's name
+     * @return Pinot table configuration
+     * @throws IOException
+     */
+    public TableConfig getTableConfig(String tableName) throws IOException {
+        TableConfig tableConfig;
+        PinotControllerHttpClient.ApiResponse res = httpClient.get(String.format("/tables/%s", tableName));
+        LOG.debug("Get table config request for table {} returned {}", tableName, res.responseBody);
+
+        try {
+            String tableConfigAsJson = JsonUtils.stringToJsonNode(res.responseBody).get("OFFLINE").toString();
+            tableConfig = JsonUtils.stringToObject(tableConfigAsJson, TableConfig.class);
+        } catch (IOException e) {
+            throw new PinotControllerApiException("Caught exception while reading table config from Pinot Controller's response: " + res.responseBody);
+        }
+        LOG.debug("Retrieved table config: {}", tableConfig.toJsonString());
+        return tableConfig;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        httpClient.close();
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java
new file mode 100644
index 0000000..6ac05d6
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+@Internal
+public class PinotControllerHttpClient implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotControllerHttpClient.class);
+    private final String controllerHostPort;
+    private final CloseableHttpClient httpClient;
+
+    /**
+     * @param controllerHost Pinot controller's host
+     * @param controllerPort Pinot controller's port
+     */
+    public PinotControllerHttpClient(String controllerHost, String controllerPort) {
+        checkNotNull(controllerHost);
+        checkNotNull(controllerPort);
+        this.controllerHostPort = String.format("http://%s:%s", controllerHost, controllerPort);
+        this.httpClient = HttpClients.createDefault();
+    }
+
+    /**
+     * Issues a request to the Pinot controller API.
+     *
+     * @param request Request to issue
+     * @return Api response
+     * @throws IOException
+     */
+    private ApiResponse execute(HttpRequestBase request) throws IOException {
+        ApiResponse result;
+
+        try (CloseableHttpResponse response = httpClient.execute(request)) {
+            String body = EntityUtils.toString(response.getEntity());
+            result = new ApiResponse(response.getStatusLine(), body);
+        }
+
+        return result;
+    }
+
+    /**
+     * Issues a POST request to the Pinot controller API.
+     *
+     * @param path Path to POST to
+     * @param body Request's body
+     * @return API response
+     * @throws IOException
+     */
+    ApiResponse post(String path, String body) throws IOException {
+        HttpPost httppost = new HttpPost(this.controllerHostPort + path);
+        httppost.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));
+        LOG.debug("Posting string entity {} to {}", body, path);
+        return this.execute(httppost);
+    }
+
+    /**
+     * Issues a GET request to the Pinot controller API.
+     *
+     * @param path Path to GET from
+     * @return API response
+     * @throws IOException
+     */
+    ApiResponse get(String path) throws IOException {
+        HttpGet httpget = new HttpGet(this.controllerHostPort + path);
+        LOG.debug("Sending GET request to {}", path);
+        return this.execute(httpget);
+    }
+
+    /**
+     * Issues a DELETE request to the Pinot controller API.
+     *
+     * @param path Path to issue DELETE request to
+     * @return API response
+     * @throws IOException
+     */
+    ApiResponse delete(String path) throws IOException {
+        HttpDelete httpdelete = new HttpDelete(this.controllerHostPort + path);
+        LOG.debug("Sending DELETE request to {}", path);
+        return this.execute(httpdelete);
+    }
+
+    @Override
+    public void close() throws IOException {
+        httpClient.close();
+    }
+
+    /**
+     * Helper class for wrapping Pinot controller API responses.
+     */
+    static class ApiResponse {
+        public final StatusLine statusLine;
+        public final String responseBody;
+
+        ApiResponse(StatusLine statusLine, String responseBody) {
+            this.statusLine = statusLine;
+            this.responseBody = responseBody;
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
new file mode 100644
index 0000000..7d2fe94
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkWriterStateSerializer;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache Pinot table. The sink
+ * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code RuntimeExecutionMode.BATCH}
+ * mode. But ensure to enable checkpointing when using in streaming mode.
+ *
+ * <p>We advise you to use the provided {@link PinotSink.Builder} to build and configure the
+ * PinotSink. All the communication with the Pinot cluster's table is managed via the Pinot
+ * controller. Thus you need to provide its host and port as well as the target Pinot table.
+ * The {@link TableConfig} and {@link Schema} is automatically retrieved via the Pinot controller API
+ * and therefore does not need to be provided.
+ *
+ * <p>Whenever an element is received by the sink it gets stored in a {@link PinotWriterSegment}. A
+ * {@link PinotWriterSegment} represents exactly one segment that will be pushed to the Pinot
+ * cluster later on. Its size is determined by the customizable {@code maxRowsPerSegment} parameter.
+ * Please note that the maximum segment size that can be handled by this sink is limited by the
+ * lower bound of memory available at each subTask.
+ * Each subTask holds a list of {@link PinotWriterSegment}s of which at most one is active. An
+ * active {@link PinotWriterSegment} is capable of accepting at least one more element. If a
+ * {@link PinotWriterSegment} switches from active to inactive it flushes its
+ * {@code maxRowsPerSegment} elements to disk. The data file is stored in the local filesystem's
+ * temporary directory and contains serialized elements. We use the {@link JsonSerializer} to
+ * serialize elements to JSON.
+ *
+ * <p>On checkpointing all not in-progress {@link PinotWriterSegment}s are transformed into
+ * committables. As the data files need to be shared across nodes, the sink requires access to a
+ * shared filesystem. We use the {@link FileSystemAdapter} for that purpose.
+ * A {@link FileSystemAdapter} is capable of copying a file from the local to the shared filesystem
+ * and vice-versa. A {@link PinotSinkCommittable} contains a reference to a data file on the shared
+ * filesystem as well as the minimum and maximum timestamp contained in the data file. A timestamp -
+ * usually the event time - is extracted from each received element via {@link EventTimeExtractor}.
+ * The timestamps are later on required to follow the guideline for naming Pinot segments.
+ * An eventually existent in-progress {@link PinotWriterSegment}'s state is saved in the snapshot
+ * taken when checkpointing. This ensures that the at-most-once delivery guarantee can be fulfilled
+ * when recovering from failures.
+ *
+ * <p>We use the {@link PinotSinkGlobalCommitter} to collect all created
+ * {@link PinotSinkCommittable}s, create segments from the referenced data files and finally push them
+ * to the Pinot table. Therefore, the minimum and maximum timestamp of all
+ * {@link PinotSinkCommittable} is determined. The segment names are then generated using the
+ * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum timestamp as input.
+ * The segment generation starts with downloading the referenced data file from the shared file system
+ * using the provided {@link FileSystemAdapter}. Once this is was completed, we use Pinot's
+ * {@link SegmentIndexCreationDriver} to generate the final segment. Each segment is thereby stored
+ * in a temporary directory on the local filesystem. Next, the segment is uploaded to the Pinot
+ * controller using Pinot's {@link UploadSegmentCommand}.
+ *
+ * <p>To ensure that possible failures are handled accordingly each segment name is checked for
+ * existence within the Pinot cluster before uploading a segment. In case a segment name already
+ * exists, i.e. if the last commit failed partially with some segments already been uploaded, the
+ * existing segment is deleted first. When the elements since the last checkpoint are replayed the
+ * minimum and maximum timestamp of all received elements will be the same. Thus the same set of
+ * segment names is generated and we can delete previous segments by checking for segment name
+ * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be deterministic. We also provide
+ * a {@link SimpleSegmentNameGenerator} which is a simple but for most users suitable segment name
+ * generator.
+ *
+ * <p>Please note that we use the {@link GlobalCommitter} to ensure consistent segment naming. This
+ * comes with performance limitations as a {@link GlobalCommitter} always runs at a parallelism of 1
+ * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} that does all the
+ * computational intensive work (i.e. generating and uploading segments). In order to overcome this
+ * issue we introduce a custom multithreading approach within the {@link PinotSinkGlobalCommitter}
+ * to parallelize the segment creation and upload process.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public class PinotSink<IN> implements Sink<IN, PinotSinkCommittable, PinotSinkWriterState, PinotSinkGlobalCommittable> {
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final int maxRowsPerSegment;
+    private final String tempDirPrefix;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final FileSystemAdapter fsAdapter;
+    private final EventTimeExtractor<IN> eventTimeExtractor;
+    private final int numCommitThreads;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param maxRowsPerSegment    Maximum number of rows to be stored within a Pinot segment
+     * @param tempDirPrefix        Prefix for temp directories used
+     * @param jsonSerializer       Serializer used to convert elements to JSON
+     * @param eventTimeExtractor   Defines the way event times are extracted from received objects
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Filesystem adapter used to save files for sharing files across nodes
+     * @param numCommitThreads     Number of threads used in the {@link PinotSinkGlobalCommitter} for committing segments
+     */
+    private PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName,
+                      int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer,
+                      EventTimeExtractor<IN> eventTimeExtractor,
+                      SegmentNameGenerator segmentNameGenerator, FileSystemAdapter fsAdapter,
+                      int numCommitThreads) {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+
+        checkArgument(maxRowsPerSegment > 0);
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        checkArgument(numCommitThreads > 0);
+        this.numCommitThreads = numCommitThreads;
+    }
+
+    /**
+     * Creates a Pinot sink writer.
+     *
+     * @param context InitContext
+     * @param states  State extracted from snapshot. This list must not have a size larger than 1
+     */
+    @Override
+    public PinotSinkWriter<IN> createWriter(InitContext context, List<PinotSinkWriterState> states) {
+        PinotSinkWriter<IN> writer = new PinotSinkWriter<>(
+                context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor,
+                jsonSerializer, fsAdapter
+        );
+
+        if (states.size() == 1) {
+            writer.initializeState(states.get(0));
+        } else if (states.size() > 1) {
+            throw new IllegalStateException("Did not expected more than one element in states.");
+        }
+        return writer;
+    }
+
+    /**
+     * The PinotSink does not use a committer. Instead a global committer is used
+     *
+     * @return Empty Optional
+     */
+    @Override
+    public Optional<Committer<PinotSinkCommittable>> createCommitter() {
+        return Optional.empty();
+    }
+
+    /**
+     * Creates the global committer.
+     */
+    @Override
+    public Optional<GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable>> createGlobalCommitter() throws IOException {
+        String timeColumnName = eventTimeExtractor.getTimeColumn();
+        TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit();
+        PinotSinkGlobalCommitter committer = new PinotSinkGlobalCommitter(
+                pinotControllerHost, pinotControllerPort, tableName, segmentNameGenerator,
+                tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, numCommitThreads
+        );
+        return Optional.of(committer);
+    }
+
+    /**
+     * Creates the committables' serializer.
+     */
+    @Override
+    public Optional<SimpleVersionedSerializer<PinotSinkCommittable>> getCommittableSerializer() {
+        return Optional.of(new PinotSinkCommittableSerializer());
+    }
+
+    /**
+     * Creates the global committables' serializer.
+     */
+    @Override
+    public Optional<SimpleVersionedSerializer<PinotSinkGlobalCommittable>> getGlobalCommittableSerializer() {
+        return Optional.of(new PinotSinkGlobalCommittableSerializer());
+    }
+
+    /**
+     * The PinotSink does not use writer states.
+     *
+     * @return Empty Optional
+     */
+    @Override
+    public Optional<SimpleVersionedSerializer<PinotSinkWriterState>> getWriterStateSerializer() {
+        return Optional.of(new PinotSinkWriterStateSerializer());
+    }
+
+    /**
+     * Builder for configuring a {@link PinotSink}. This is the recommended public API.
+     *
+     * @param <IN> Type of incoming elements
+     */
+    public static class Builder<IN> {
+
+        static final int DEFAULT_COMMIT_THREADS = 4;
+
+        String pinotControllerHost;
+        String pinotControllerPort;
+        String tableName;
+        int maxRowsPerSegment;
+        String tempDirPrefix = "flink-connector-pinot";
+        JsonSerializer<IN> jsonSerializer;
+        EventTimeExtractor<IN> eventTimeExtractor;
+        SegmentNameGenerator segmentNameGenerator;
+        FileSystemAdapter fsAdapter;
+        int numCommitThreads = DEFAULT_COMMIT_THREADS;
+
+        /**
+         * Defines the basic connection parameters.
+         *
+         * @param pinotControllerHost Host of the Pinot controller
+         * @param pinotControllerPort Port of the Pinot controller
+         * @param tableName           Target table's name
+         */
+        public Builder(String pinotControllerHost, String pinotControllerPort, String tableName) {
+            this.pinotControllerHost = pinotControllerHost;
+            this.pinotControllerPort = pinotControllerPort;
+            this.tableName = tableName;
+        }
+
+        /**
+         * Defines the serializer used to serialize elements to JSON format.
+         *
+         * @param jsonSerializer JsonSerializer
+         * @return Builder
+         */
+        public Builder<IN> withJsonSerializer(JsonSerializer<IN> jsonSerializer) {
+            this.jsonSerializer = jsonSerializer;
+            return this;
+        }
+
+        /**
+         * Defines the EventTimeExtractor<IN> used to extract event times from received objects.
+         *
+         * @param eventTimeExtractor EventTimeExtractor
+         * @return Builder
+         */
+        public Builder<IN> withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor) {
+            this.eventTimeExtractor = eventTimeExtractor;
+            return this;
+        }
+
+        /**
+         * Defines the SegmentNameGenerator used to generate names for the segments pushed to Pinot.
+         *
+         * @param segmentNameGenerator SegmentNameGenerator
+         * @return Builder
+         */
+        public Builder<IN> withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
+            this.segmentNameGenerator = segmentNameGenerator;
+            return this;
+        }
+
+        /**
+         * Defines a basic segment name generator which will be used to generate names for the
+         * segments pushed to Pinot.
+         *
+         * @param segmentNamePostfix Postfix which will be appended to the segment name to identify
+         *                           segments coming from this Flink sink
+         * @return Builder
+         */
+        public Builder<IN> withSimpleSegmentNameGenerator(String segmentNamePostfix) {
+            return withSegmentNameGenerator(new SimpleSegmentNameGenerator(tableName, segmentNamePostfix));
+        }
+
+        /**
+         * Defines the FileSystemAdapter used share data files between the {@link PinotSinkWriter} and
+         * the {@link PinotSinkGlobalCommitter}.
+         *
+         * @param fsAdapter Adapter for interacting with the shared file system
+         * @return Builder
+         */
+        public Builder<IN> withFileSystemAdapter(FileSystemAdapter fsAdapter) {
+            this.fsAdapter = fsAdapter;
+            return this;
+        }
+
+        /**
+         * Defines the segment size via the maximum number of elements per segment.
+         *
+         * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+         * @return Builder
+         */
+        public Builder<IN> withMaxRowsPerSegment(int maxRowsPerSegment) {
+            this.maxRowsPerSegment = maxRowsPerSegment;
+            return this;
+        }
+
+        /**
+         * Defines the path prefix for the files created in a node's local filesystem.
+         *
+         * @param tempDirPrefix Prefix for temp directories used
+         * @return Builder
+         */
+        public Builder<IN> withTempDirectoryPrefix(String tempDirPrefix) {
+            this.tempDirPrefix = tempDirPrefix;
+            return this;
+        }
+
+        /**
+         * Defines the number of threads that shall be used to commit segments in the {@link PinotSinkGlobalCommitter}.
+         *
+         * @param numCommitThreads Number of threads
+         * @return Builder
+         */
+        public Builder<IN> withNumCommitThreads(int numCommitThreads) {
+            this.numCommitThreads = numCommitThreads;
+            return this;
+        }
+
+        /**
+         * Finally builds the {@link PinotSink} according to the configuration.
+         *
+         * @return PinotSink
+         */
+        public PinotSink<IN> build() {
+            return new PinotSink<>(
+                    pinotControllerHost,
+                    pinotControllerPort,
+                    tableName,
+                    maxRowsPerSegment,
+                    tempDirPrefix,
+                    jsonSerializer,
+                    eventTimeExtractor,
+                    segmentNameGenerator,
+                    fsAdapter,
+                    numCommitThreads
+            );
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
new file mode 100644
index 0000000..5a8c655
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The PinotSinkCommittable is required for sharing committables with the
+ * {@link PinotSinkGlobalCommitter} instance
+ */
+@Internal
+public class PinotSinkCommittable implements Serializable {
+    private final String dataFilePath;
+    private final long minTimestamp;
+    private final long maxTimestamp;
+
+    /**
+     * @param dataFilePath Path referencing a file on the shared filesystem defined via {@link org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter}
+     * @param minTimestamp The minimum timestamp of all the elements contained in {@link #dataFilePath}
+     * @param maxTimestamp The maximum timestamp of all the elements contained in {@link #dataFilePath}
+     */
+    public PinotSinkCommittable(String dataFilePath, long minTimestamp, long maxTimestamp) {
+        this.dataFilePath = checkNotNull(dataFilePath);
+        this.minTimestamp = minTimestamp;
+        this.maxTimestamp = maxTimestamp;
+    }
+
+    public String getDataFilePath() {
+        return dataFilePath;
+    }
+
+    public long getMinTimestamp() {
+        return minTimestamp;
+    }
+
+    public long getMaxTimestamp() {
+        return maxTimestamp;
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommittable.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommittable.java
new file mode 100644
index 0000000..766b831
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommittable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committable references all data files that will be committed during checkpointing.
+ */
+@Internal
+public class PinotSinkGlobalCommittable implements Serializable {
+    private final List<String> dataFilePaths;
+    private final long minTimestamp;
+    private final long maxTimestamp;
+
+    /**
+     * @param dataFilePaths List of paths to data files on shared file system
+     * @param minTimestamp  Minimum timestamp of all objects in all data files
+     * @param maxTimestamp  Maximum timestamp of all objects in all data files
+     */
+    public PinotSinkGlobalCommittable(List<String> dataFilePaths, long minTimestamp, long maxTimestamp) {
+        this.dataFilePaths = checkNotNull(dataFilePaths);
+        this.minTimestamp = minTimestamp;
+        this.maxTimestamp = maxTimestamp;
+    }
+
+    public List<String> getDataFilePaths() {
+        return dataFilePaths;
+    }
+
+    public long getMinTimestamp() {
+        return minTimestamp;
+    }
+
+    public long getMaxTimestamp() {
+        return maxTimestamp;
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
new file mode 100644
index 0000000..46e03e4
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerClient;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment creation and upload to
+ * overcome the performance limitations resulting from using a {@link GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+@Internal
+public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final FileSystemAdapter fsAdapter;
+    private final String timeColumnName;
+    private final TimeUnit segmentTimeUnit;
+    private final PinotControllerClient pinotControllerClient;
+    private final File tempDirectory;
+    private final Schema tableSchema;
+    private final TableConfig tableConfig;
+    private final ExecutorService pool;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param tempDirPrefix        Prefix for directory to store temporary files in
+     * @param fsAdapter            Adapter for interacting with the shared file system
+     * @param timeColumnName       Name of the column containing the timestamp
+     * @param segmentTimeUnit      Unit of the time column
+     * @param numCommitThreads     Number of threads used to commit the committables
+     */
+    public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort,
+                                    String tableName, SegmentNameGenerator segmentNameGenerator,
+                                    String tempDirPrefix, FileSystemAdapter fsAdapter,
+                                    String timeColumnName, TimeUnit segmentTimeUnit,
+                                    int numCommitThreads) throws IOException {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.timeColumnName = checkNotNull(timeColumnName);
+        this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+        this.pinotControllerClient = new PinotControllerClient(pinotControllerHost, pinotControllerPort);
+
+        // Create directory that temporary files will be stored in
+        this.tempDirectory = Files.createTempDirectory(tempDirPrefix).toFile();
+
+        // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller
+        this.tableSchema = pinotControllerClient.getSchema(tableName);
+        this.tableConfig = pinotControllerClient.getTableConfig(tableName);
+
+        // We use a thread pool in order to parallelize the segment creation and segment upload
+        checkArgument(numCommitThreads > 0);
+        this.pool = Executors.newFixedThreadPool(numCommitThreads);
+    }
+
+    /**
+     * Identifies global committables that need to be re-committed from a list of recovered committables.
+     *
+     * @param globalCommittables List of global committables that are checked for required re-commit
+     * @return List of global committable that need to be re-committed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        // Holds identified global committables whose commit needs to be retried
+        List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+            CommitStatus commitStatus = getCommitStatus(globalCommittable);
+
+            if (commitStatus.getMissingSegmentNames().isEmpty()) {
+                // All segments were already committed. Thus, we do not need to retry the commit.
+                continue;
+            }
+
+            for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we cannot assure the data
+                // files containing the same data as originally when recovering from failure,
+                // we delete the already committed segments in order to recommit them later on.
+                pinotControllerClient.deleteSegment(tableName, existingSegment);
+            }
+            committablesToRetry.add(globalCommittable);
+        }
+        return committablesToRetry;
+    }
+
+    /**
+     * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable}
+     * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s.
+     *
+     * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+     * @return Global committer committable
+     */
+    @Override
+    public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) {
+        List<String> dataFilePaths = new ArrayList<>();
+        long minTimestamp = Long.MAX_VALUE;
+        long maxTimestamp = Long.MIN_VALUE;
+
+        // Extract all data file paths and the overall minimum and maximum timestamps
+        // from all committables
+        for (PinotSinkCommittable committable : committables) {
+            dataFilePaths.add(committable.getDataFilePath());
+            minTimestamp = Long.min(minTimestamp, committable.getMinTimestamp());
+            maxTimestamp = Long.max(maxTimestamp, committable.getMaxTimestamp());
+        }
+
+        LOG.debug("Combined {} committables into one global committable", committables.size());
+        return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
+    }
+
+    /**
+     * Copies data files from shared filesystem to the local filesystem, generates segments with names
+     * according to the segment naming schema and finally pushes the segments to the Pinot cluster.
+     * Before pushing a segment it is checked whether there already exists a segment with that name
+     * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted.
+     *
+     * @param globalCommittables List of global committables
+     * @return Global committables whose commit failed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        // List of failed global committables that can be retried later on
+        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+            Set<Future<Boolean>> resultFutures = new HashSet<>();
+            // Commit all segments in globalCommittable
+            for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+                String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
+                // Get segment names with increasing sequenceIds
+                String segmentName = getSegmentName(globalCommittable, sequenceId);
+                // Segment committer handling the whole commit process for a single segment
+                Callable<Boolean> segmentCommitter = new SegmentCommitter(
+                        pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
+                        dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
+                        segmentTimeUnit
+                );
+                // Submits the segment committer to the thread pool
+                resultFutures.add(pool.submit(segmentCommitter));
+            }
+
+            boolean commitSucceeded = true;
+            try {
+                for (Future<Boolean> wasSuccessful : resultFutures) {
+                    // In case any of the segment commits wasn't successful we mark the whole
+                    // globalCommittable as failed
+                    if (!wasSuccessful.get()) {
+                        commitSucceeded = false;
+                        failedCommits.add(globalCommittable);
+                        // Once any of the commits failed, we do not need to check the remaining
+                        // ones, as we try to commit the globalCommittable next time
+                        break;
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                // In case of an exception thrown while accessing commit status, mark the whole
+                // globalCommittable as failed
+                failedCommits.add(globalCommittable);
+                LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
+            }
+
+            if (commitSucceeded) {
+                // If commit succeeded, cleanup the data files stored on the shared file system. In
+                // case the commit of at least one of the segments failed, nothing will be cleaned
+                // up here to enable retrying failed commits (data files must therefore stay
+                // available on the shared filesystem).
+                for (String path : globalCommittable.getDataFilePaths()) {
+                    fsAdapter.deleteFromSharedFileSystem(path);
+                }
+            }
+        }
+
+        // Return failed commits so that they can be retried later on
+        return failedCommits;
+    }
+
+    /**
+     * Empty method.
+     */
+    @Override
+    public void endOfInput() {
+    }
+
+    /**
+     * Closes the Pinot controller http client, clears the created temporary directory and
+     * shuts the thread pool down.
+     */
+    @Override
+    public void close() throws IOException {
+        pinotControllerClient.close();
+        tempDirectory.delete();
+        pool.shutdown();
+    }
+
+    /**
+     * Helper method for generating segment names using the segment name generator.
+     *
+     * @param globalCommittable Global committable the segment name shall be generated from
+     * @param sequenceId        Incrementing counter
+     * @return generated segment name
+     */
+    private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
+        return segmentNameGenerator.generateSegmentName(sequenceId,
+                globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
+    }
+
+    /**
+     * Evaluates the status of already uploaded segments by requesting segment metadata from the
+     * Pinot controller.
+     *
+     * @param globalCommittable Global committable whose commit status gets evaluated
+     * @return Commit status
+     * @throws IOException
+     */
+    private CommitStatus getCommitStatus(PinotSinkGlobalCommittable globalCommittable) throws IOException {
+        List<String> existingSegmentNames = new ArrayList<>();
+        List<String> missingSegmentNames = new ArrayList<>();
+
+        // For all segment names that will be used to submit new segments, check whether the segment
+        // name already exists for the target table
+        for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+            String segmentName = getSegmentName(globalCommittable, sequenceId);
+            if (pinotControllerClient.tableHasSegment(tableName, segmentName)) {
+                // Segment name already exists
+                existingSegmentNames.add(segmentName);
+            } else {
+                // Segment name does not exist yet
+                missingSegmentNames.add(segmentName);
+            }
+        }
+        return new CommitStatus(existingSegmentNames, missingSegmentNames);
+    }
+
+    /**
+     * Wrapper for existing and missing segments in the Pinot cluster.
+     */
+    private static class CommitStatus {
+        private final List<String> existingSegmentNames;
+        private final List<String> missingSegmentNames;
+
+        CommitStatus(List<String> existingSegmentNames, List<String> missingSegmentNames) {
+            this.existingSegmentNames = existingSegmentNames;
+            this.missingSegmentNames = missingSegmentNames;
+        }
+
+        public List<String> getExistingSegmentNames() {
+            return existingSegmentNames;
+        }
+
+        public List<String> getMissingSegmentNames() {
+            return missingSegmentNames;
+        }
+    }
+
+    /**
+     * Helper class for committing a single segment. Downloads a data file from the shared filesystem,
+     * generates a segment from the data file and uploads segment to the Pinot controller.
+     */
+    private static class SegmentCommitter implements Callable<Boolean> {
+
+        private static final Logger LOG = LoggerFactory.getLogger(SegmentCommitter.class);
+
+        private final String pinotControllerHost;
+        private final String pinotControllerPort;
+        private final File tempDirectory;
+        private final FileSystemAdapter fsAdapter;
+        private final String dataFilePath;
+        private final String segmentName;
+        private final Schema tableSchema;
+        private final TableConfig tableConfig;
+        private final String timeColumnName;
+        private final TimeUnit segmentTimeUnit;
+
+        /**
+         * @param pinotControllerHost Host of the Pinot controller
+         * @param pinotControllerPort Port of the Pinot controller
+         * @param tempDirectory       Directory to store temporary files in
+         * @param fsAdapter           Filesystem adapter used to load data files from the shared file system
+         * @param dataFilePath        Data file to load from the shared file system
+         * @param segmentName         Name of the segment to create and commit
+         * @param tableSchema         Pinot table schema
+         * @param tableConfig         Pinot table config
+         * @param timeColumnName      Name of the column containing the timestamp
+         * @param segmentTimeUnit     Unit of the time column
+         */
+        SegmentCommitter(String pinotControllerHost, String pinotControllerPort,
+                         File tempDirectory, FileSystemAdapter fsAdapter,
+                         String dataFilePath, String segmentName, Schema tableSchema,
+                         TableConfig tableConfig, String timeColumnName,
+                         TimeUnit segmentTimeUnit) {
+            this.pinotControllerHost = pinotControllerHost;
+            this.pinotControllerPort = pinotControllerPort;
+            this.tempDirectory = tempDirectory;
+            this.fsAdapter = fsAdapter;
+            this.dataFilePath = dataFilePath;
+            this.segmentName = segmentName;
+            this.tableSchema = tableSchema;
+            this.tableConfig = tableConfig;
+            this.timeColumnName = timeColumnName;
+            this.segmentTimeUnit = segmentTimeUnit;
+        }
+
+        /**
+         * Downloads a segment from the shared file system via {@code fsAdapter}, generates a segment
+         * and finally uploads the segment to the Pinot controller
+         *
+         * @return True if the commit succeeded
+         */
+        @Override
+        public Boolean call() {
+            // Local copy of data file stored on the shared filesystem
+            File segmentData = null;
+            // File containing the final Pinot segment
+            File segmentFile = null;
+            try {
+                // Download data file from the shared filesystem
+                LOG.debug("Downloading data file {} from shared file system...", dataFilePath);
+                List<String> serializedElements = fsAdapter.readFromSharedFileSystem(dataFilePath);
+                segmentData = FileSystemUtils.writeToLocalFile(serializedElements, tempDirectory);
+                LOG.debug("Successfully downloaded data file {} from shared file system", dataFilePath);
+
+                segmentFile = FileSystemUtils.createFileInDir(tempDirectory);
+                LOG.debug("Creating segment in " + segmentFile.getAbsolutePath());
+
+                // Creates a segment with name `segmentName` in `segmentFile`
+                generateSegment(segmentData, segmentFile, true);
+
+                // Uploads the recently created segment to the Pinot controller
+                uploadSegment(segmentFile);
+
+                // Commit successful
+                return true;
+            } catch (IOException e) {
+                LOG.error("Error while committing segment data stored on shared filesystem.", e);
+
+                // Commit failed
+                return false;
+            } finally {
+                // Finally cleanup all files created on the local filesystem
+                if (segmentData != null) {
+                    segmentData.delete();
+                }
+                if (segmentFile != null) {
+                    segmentFile.delete();
+                }
+            }
+        }
+
+        /**
+         * Creates a segment from the given parameters.
+         * This method was adapted from {@link org.apache.pinot.tools.admin.command.CreateSegmentCommand}.
+         *
+         * @param dataFile                  File containing the JSON data
+         * @param outDir                    Segment target path
+         * @param _postCreationVerification Verify segment after generation
+         */
+        private void generateSegment(File dataFile, File outDir, Boolean _postCreationVerification) {
+            SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, tableSchema);
+            segmentGeneratorConfig.setSegmentName(segmentName);
+            segmentGeneratorConfig.setSegmentTimeUnit(segmentTimeUnit);
+            segmentGeneratorConfig.setTimeColumnName(timeColumnName);
+            segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
+            segmentGeneratorConfig.setFormat(FileFormat.JSON);
+            segmentGeneratorConfig.setOutDir(outDir.getPath());
+            segmentGeneratorConfig.setTableName(tableConfig.getTableName());
+
+            try {
+                SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+                driver.init(segmentGeneratorConfig);
+                driver.build();
+                File indexDir = new File(outDir, segmentName);
+                LOG.debug("Successfully created segment: {} in directory: {}", segmentName, indexDir);
+                if (_postCreationVerification) {
+                    LOG.debug("Verifying the segment by loading it");
+                    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+                    LOG.debug("Successfully loaded segment: {} of size: {} bytes", segmentName,
+                            segment.getSegmentSizeBytes());
+                    segment.destroy();
+                }
+            }
+            // SegmentIndexCreationDriverImpl throws generic Exceptions during init and build
+            // ImmutableSegmentLoader throws generic Exception during load
+            catch (Exception e) {
+                String message = String.format("Error while generating segment from file %s", dataFile.getAbsolutePath());
+                LOG.error(message, e);
+                throw new RuntimeException(message);
+            }
+            LOG.debug("Successfully created 1 segment from data file: {}", dataFile);
+        }
+
+        /**
+         * Uploads a segment using the Pinot admin tool.
+         *
+         * @param segmentFile File containing the segment to upload
+         * @throws IOException
+         */
+        private void uploadSegment(File segmentFile) throws IOException {
+            try {
+                UploadSegmentCommand cmd = new UploadSegmentCommand();
+                cmd.setControllerHost(pinotControllerHost);
+                cmd.setControllerPort(pinotControllerPort);
+                cmd.setSegmentDir(segmentFile.getAbsolutePath());
+                cmd.execute();
+            } catch (Exception e) {
+                // UploadSegmentCommand.execute() throws generic Exception
+                LOG.error("Could not upload segment {}", segmentFile.getAbsolutePath(), e);
+                throw new IOException(e.getMessage());
+            }
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/exceptions/PinotControllerApiException.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/exceptions/PinotControllerApiException.java
new file mode 100644
index 0000000..c5283d6
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/exceptions/PinotControllerApiException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.exceptions;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * Pinot controller API exception wrapper
+ */
+@Internal
+public class PinotControllerApiException extends IOException {
+
+    public PinotControllerApiException(String reason) {
+        super(reason);
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java
new file mode 100644
index 0000000..fe2647d
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.external;
+
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Defines the interface for event time extractors
+ *
+ * @param <IN> Type of incoming elements
+ */
+public interface EventTimeExtractor<IN> extends Serializable {
+
+    /**
+     * Extracts event time from incoming elements.
+     *
+     * @param element Incoming element
+     * @param context Context of SinkWriter
+     * @return timestamp
+     */
+    long getEventTime(IN element, SinkWriter.Context context);
+
+    /**
+     * @return Name of column in Pinot target table that contains the timestamp.
+     */
+    String getTimeColumn();
+
+    /**
+     * @return Unit of the time column in the Pinot target table.
+     */
+    TimeUnit getSegmentTimeUnit();
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/JsonSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/JsonSerializer.java
new file mode 100644
index 0000000..8774ac1
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/JsonSerializer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.external;
+
+import java.io.Serializable;
+
+/**
+ * Defines the interface for serializing incoming elements to JSON format.
+ * The JSON format is expected during Pinot segment creation.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public abstract class JsonSerializer<IN> implements Serializable {
+
+    public abstract String toJson(IN element);
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemAdapter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemAdapter.java
new file mode 100644
index 0000000..42610a8
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.filesystem;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Defines the interaction with a shared filesystem. The shared filesystem must be accessible from all
+ * nodes within the cluster than run a partition of the {@link org.apache.flink.streaming.connectors.pinot.PinotSink}.
+ */
+public interface FileSystemAdapter extends Serializable {
+
+    /**
+     * Writes a list of serialized elements to the shared filesystem.
+     *
+     * @param elements List of serialized elements
+     * @return Path identifying the remote file
+     * @throws IOException
+     */
+    String writeToSharedFileSystem(List<String> elements) throws IOException;
+
+    /**
+     * Reads a previously written list of serialized elements from the shared filesystem.
+     *
+     * @param path Path returned by {@link #writeToSharedFileSystem}
+     * @return List of serialized elements read from the shared filesystem
+     * @throws IOException
+     */
+    List<String> readFromSharedFileSystem(String path) throws IOException;
+
+    /**
+     * Deletes a file from the shared filesystem
+     *
+     * @param path Path returned by {@link #writeToSharedFileSystem}
+     * @throws IOException
+     */
+    void deleteFromSharedFileSystem(String path) throws IOException;
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemUtils.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemUtils.java
new file mode 100644
index 0000000..b61d9cf
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.UUID;
+
+@Internal
+public class FileSystemUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtils.class);
+
+    /**
+     * Writes a list of serialized elements to the temp directory of local filesystem
+     * with prefix tempDirPrefix
+     *
+     * @param elements  List of serialized elements
+     * @param targetDir Directory to create file in
+     * @return File containing the written data
+     * @throws IOException
+     */
+    public static File writeToLocalFile(List<String> elements, File targetDir) throws IOException {
+        File dataFile = createFileInDir(targetDir);
+
+        Files.write(dataFile.toPath(), elements, Charset.defaultCharset());
+        LOG.debug("Successfully written data to file {}", dataFile.getAbsolutePath());
+
+        return dataFile;
+    }
+
+    /**
+     * Creates file with random name in targetDir.
+     *
+     * @param targetDir Directory to create file in
+     * @return New File
+     */
+    public static File createFileInDir(File targetDir) {
+        String fileName = String.format("%s.json", UUID.randomUUID().toString());
+        return new File(targetDir.toString(), fileName);
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/PinotSinkSegmentNameGenerator.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/PinotSinkSegmentNameGenerator.java
new file mode 100644
index 0000000..ee3a908
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/PinotSinkSegmentNameGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.segment.name;
+
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+
+import java.io.Serializable;
+
+/**
+ * Defines the segment name generator interface that is used to generate segment names. The segment
+ * name generator is required to be serializable. We expect users to inherit from
+ * {@link PinotSinkSegmentNameGenerator} in case they want to define their custom name generator.
+ */
+public interface PinotSinkSegmentNameGenerator extends SegmentNameGenerator, Serializable {
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/SimpleSegmentNameGenerator.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/SimpleSegmentNameGenerator.java
new file mode 100644
index 0000000..666673c
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/SimpleSegmentNameGenerator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.segment.name;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapted from {@link org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator}.
+ * <p>
+ * Simple segment name generator which does not perform time conversion.
+ * <p>
+ * The segment name is simply joining the following fields with '_' but ignoring all the {@code null}s.
+ * <ul>
+ *   <li>Table name</li>
+ *   <li>Minimum time value</li>
+ *   <li>Maximum time value</li>
+ *   <li>Segment name postfix</li>
+ *   <li>Sequence id</li>
+ * </ul>
+ */
+public class SimpleSegmentNameGenerator implements PinotSinkSegmentNameGenerator {
+
+    private final String tableName;
+    private final String segmentNamePostfix;
+
+    public SimpleSegmentNameGenerator(String tableName, String segmentNamePostfix) {
+        this.tableName = checkNotNull(tableName);
+        this.segmentNamePostfix = checkNotNull(segmentNamePostfix);
+    }
+
+    @Override
+    public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+        return JOINER
+                .join(tableName, minTimeValue, maxTimeValue, segmentNamePostfix, sequenceId >= 0 ? sequenceId : null);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder stringBuilder = new StringBuilder("SimpleSegmentNameGenerator: tableName=").append(tableName);
+        if (segmentNamePostfix != null) {
+            stringBuilder.append(", segmentNamePostfix=").append(segmentNamePostfix);
+        }
+        return stringBuilder.toString();
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
new file mode 100644
index 0000000..ed61de2
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+
+import java.io.*;
+
+/**
+ * Serializer for {@link PinotSinkCommittable}
+ */
+@Internal
+public class PinotSinkCommittableSerializer implements SimpleVersionedSerializer<PinotSinkCommittable> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PinotSinkCommittable pinotSinkCommittable) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeLong(pinotSinkCommittable.getMinTimestamp());
+            out.writeLong(pinotSinkCommittable.getMaxTimestamp());
+            out.writeUTF(pinotSinkCommittable.getDataFilePath());
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public PinotSinkCommittable deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+        switch (version) {
+            case 1:
+                return deserializeV1(serialized);
+            default:
+                throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
+        }
+    }
+
+    private PinotSinkCommittable deserializeV1(byte[] serialized) throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+             DataInputStream in = new DataInputStream(bais)) {
+            long minTimestamp = in.readLong();
+            long maxTimestamp = in.readLong();
+            String dataFilePath = in.readUTF();
+            return new PinotSinkCommittable(dataFilePath, minTimestamp, maxTimestamp);
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
new file mode 100644
index 0000000..8e45620
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for {@link PinotSinkGlobalCommittable}
+ */
+@Internal
+public class PinotSinkGlobalCommittableSerializer implements SimpleVersionedSerializer<PinotSinkGlobalCommittable> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PinotSinkGlobalCommittable pinotSinkGlobalCommittable) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeLong(pinotSinkGlobalCommittable.getMinTimestamp());
+            out.writeLong(pinotSinkGlobalCommittable.getMaxTimestamp());
+
+            int size = pinotSinkGlobalCommittable.getDataFilePaths().size();
+            out.writeInt(size);
+            for (String dataFilePath : pinotSinkGlobalCommittable.getDataFilePaths()) {
+                out.writeUTF(dataFilePath);
+            }
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public PinotSinkGlobalCommittable deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+        switch (version) {
+            case 1:
+                return deserializeV1(serialized);
+            default:
+                throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
+        }
+    }
+
+    private PinotSinkGlobalCommittable deserializeV1(byte[] serialized) throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+             DataInputStream in = new DataInputStream(bais)) {
+            long minTimestamp = in.readLong();
+            long maxTimestamp = in.readLong();
+
+            long size = in.readInt();
+            List<String> dataFilePaths = new ArrayList<>();
+            for (int i = 0; i < size; i++) {
+                dataFilePaths.add(in.readUTF());
+            }
+            return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
new file mode 100644
index 0000000..6dc7efa
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for {@link PinotSinkWriterState}
+ */
+@Internal
+public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer<PinotSinkWriterState> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PinotSinkWriterState writerState) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeLong(writerState.getMinTimestamp());
+            out.writeLong(writerState.getMaxTimestamp());
+
+            out.writeInt(writerState.getSerializedElements().size());
+            for (String serialized : writerState.getSerializedElements()) {
+                out.writeUTF(serialized);
+            }
+
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public PinotSinkWriterState deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+        switch (version) {
+            case 1:
+                return deserializeV1(serialized);
+            default:
+                throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
+        }
+    }
+
+    private PinotSinkWriterState deserializeV1(byte[] serialized) throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+             DataInputStream in = new DataInputStream(bais)) {
+            long minTimestamp = in.readLong();
+            long maxTimestamp = in.readLong();
+
+            long size = in.readInt();
+            List<String> serializedElements = new ArrayList<>();
+            for (int i = 0; i < size; i++) {
+                serializedElements.add(in.readUTF());
+            }
+            return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
new file mode 100644
index 0000000..1a84e02
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import com.google.common.collect.Iterables;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Accepts incoming elements and creates {@link PinotSinkCommittable}s out of them on request.
+ *
+ * @param <IN> Type of incoming elements
+ */
+@Internal
+public class PinotSinkWriter<IN> implements SinkWriter<IN, PinotSinkCommittable, PinotSinkWriterState> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkWriter.class);
+
+    private final int maxRowsPerSegment;
+    private final EventTimeExtractor<IN> eventTimeExtractor;
+    private final JsonSerializer<IN> jsonSerializer;
+
+    private final List<PinotWriterSegment<IN>> activeSegments;
+    private final FileSystemAdapter fsAdapter;
+
+    private final int subtaskId;
+
+    /**
+     * @param subtaskId          Subtask id provided by Flink
+     * @param maxRowsPerSegment  Maximum number of rows to be stored within a Pinot segment
+     * @param eventTimeExtractor Defines the way event times are extracted from received objects
+     * @param jsonSerializer     Serializer used to convert elements to JSON
+     * @param fsAdapter          Filesystem adapter used to save files for sharing files across nodes
+     */
+    public PinotSinkWriter(int subtaskId, int maxRowsPerSegment,
+                           EventTimeExtractor<IN> eventTimeExtractor,
+                           JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) {
+        this.subtaskId = subtaskId;
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.activeSegments = new ArrayList<>();
+    }
+
+    /**
+     * Takes elements from an upstream tasks and writes them into {@link PinotWriterSegment}
+     *
+     * @param element Object from upstream task
+     * @param context SinkWriter context
+     * @throws IOException
+     */
+    @Override
+    public void write(IN element, Context context) throws IOException {
+        final PinotWriterSegment<IN> inProgressSegment = getOrCreateInProgressSegment();
+        inProgressSegment.write(element, eventTimeExtractor.getEventTime(element, context));
+    }
+
+    /**
+     * Creates {@link PinotSinkCommittable}s from elements previously received via {@link #write}.
+     * If flush is set, all {@link PinotWriterSegment}s are transformed into
+     * {@link PinotSinkCommittable}s. If flush is not set, only currently non-active
+     * {@link PinotSinkCommittable}s are transformed into {@link PinotSinkCommittable}s.
+     * To convert a {@link PinotWriterSegment} into a {@link PinotSinkCommittable} the data gets
+     * written to the shared filesystem. Moreover, minimum and maximum timestamps are identified.
+     * Finally, all {@link PinotWriterSegment}s transformed into {@link PinotSinkCommittable}s are
+     * removed from {@link #activeSegments}.
+     *
+     * @param flush Flush all currently known elements into the {@link PinotSinkCommittable}s
+     * @return List of {@link PinotSinkCommittable} to process in {@link org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter}
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkCommittable> prepareCommit(boolean flush) throws IOException {
+        // Identify segments to commit. If the flush argument is set all segments shall be committed.
+        // Otherwise, take only those PinotWriterSegments that do not accept any more elements.
+        List<PinotWriterSegment<IN>> segmentsToCommit = activeSegments.stream()
+                .filter(s -> flush || !s.acceptsElements())
+                .collect(Collectors.toList());
+        LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId);
+
+        LOG.debug("Creating committables... [subtaskId={}]", subtaskId);
+        List<PinotSinkCommittable> committables = new ArrayList<>();
+        for (final PinotWriterSegment<IN> segment : segmentsToCommit) {
+            committables.add(segment.prepareCommit());
+        }
+        LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId);
+
+        // Remove all PinotWriterSegments that will be emitted within the committables.
+        activeSegments.removeAll(segmentsToCommit);
+        return committables;
+    }
+
+    /**
+     * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one.
+     *
+     * @return {@link PinotWriterSegment} accepting at least one more element
+     */
+    private PinotWriterSegment<IN> getOrCreateInProgressSegment() {
+        final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
+        if (latestSegment == null || !latestSegment.acceptsElements()) {
+            final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
+            activeSegments.add(inProgressSegment);
+            return inProgressSegment;
+        }
+        return latestSegment;
+    }
+
+    /**
+     * Snapshots the latest PinotWriterSegment (if existent), so that the contained (and not yet
+     * committed) elements can be recovered later on in case of a failure.
+     *
+     * @return A list containing at most one PinotSinkWriterState
+     */
+    @Override
+    public List<PinotSinkWriterState> snapshotState() {
+        final PinotWriterSegment<IN> latestSegment = Iterables.getLast(activeSegments, null);
+        if (latestSegment == null || !latestSegment.acceptsElements()) {
+            return new ArrayList<>();
+        }
+
+        return Collections.singletonList(latestSegment.snapshotState());
+    }
+
+    /**
+     * Initializes the writer according to a previously taken snapshot.
+     *
+     * @param state PinotSinkWriterState extracted from snapshot
+     */
+    public void initializeState(PinotSinkWriterState state) {
+        if (activeSegments.size() != 0) {
+            throw new IllegalStateException("Please call the initialization before creating the first PinotWriterSegment.");
+        }
+        // Create a new PinotWriterSegment and recover its state from the given PinotSinkWriterState
+        final PinotWriterSegment<IN> inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
+        inProgressSegment.initializeState(state.getSerializedElements(), state.getMinTimestamp(), state.getMaxTimestamp());
+        activeSegments.add(inProgressSegment);
+    }
+
+    /**
+     * Empty method, as we do not open any connections.
+     */
+    @Override
+    public void close() {
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
new file mode 100644
index 0000000..0e23e2f
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class PinotSinkWriterState implements Serializable {
+
+    private final List<String> serializedElements;
+    private final long minTimestamp;
+    private final long maxTimestamp;
+
+    public PinotSinkWriterState(List<String> serializedElements, long minTimestamp, long maxTimestamp) {
+        this.serializedElements = serializedElements;
+        this.minTimestamp = minTimestamp;
+        this.maxTimestamp = maxTimestamp;
+    }
+
+    public List<String> getSerializedElements() {
+        return serializedElements;
+    }
+
+    public long getMinTimestamp() {
+        return minTimestamp;
+    }
+
+    public long getMaxTimestamp() {
+        return maxTimestamp;
+    }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
new file mode 100644
index 0000000..50be145
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link PinotWriterSegment} represents exactly one segment that can be found in the Pinot
+ * cluster once the commit has been completed.
+ *
+ * @param <IN> Type of incoming elements
+ */
+@Internal
+public class PinotWriterSegment<IN> implements Serializable {
+
+    private final int maxRowsPerSegment;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final FileSystemAdapter fsAdapter;
+
+    private boolean acceptsElements = true;
+
+    private final List<String> serializedElements;
+    private String dataPathOnSharedFS;
+    private long minTimestamp = Long.MAX_VALUE;
+    private long maxTimestamp = Long.MIN_VALUE;
+
+    /**
+     * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+     * @param jsonSerializer    Serializer used to convert elements to JSON
+     * @param fsAdapter         Filesystem adapter used to save files for sharing files across nodes
+     */
+    PinotWriterSegment(int maxRowsPerSegment, JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) {
+        checkArgument(maxRowsPerSegment > 0L);
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.serializedElements = new ArrayList<>();
+    }
+
+    /**
+     * Takes elements and stores them in memory until either {@link #maxRowsPerSegment} is reached
+     * or {@link #prepareCommit} is called.
+     *
+     * @param element   Object from upstream task
+     * @param timestamp Timestamp assigned to element
+     * @throws IOException
+     */
+    public void write(IN element, long timestamp) throws IOException {
+        if (!acceptsElements()) {
+            throw new IllegalStateException("This PinotSegmentWriter does not accept any elements anymore.");
+        }
+        // Store serialized element in serializedElements
+        serializedElements.add(jsonSerializer.toJson(element));
+        minTimestamp = Long.min(minTimestamp, timestamp);
+        maxTimestamp = Long.max(maxTimestamp, timestamp);
+
+        // Writes elements to local filesystem once the maximum number of items is reached
+        if (serializedElements.size() == maxRowsPerSegment) {
+            acceptsElements = false;
+            dataPathOnSharedFS = writeToSharedFilesystem();
+            serializedElements.clear();
+        }
+    }
+
+    /**
+     * Writes elements to local file (if not already done). Copies just created file to the shared
+     * filesystem defined via {@link FileSystemAdapter} and creates a {@link PinotSinkCommittable}.
+     *
+     * @return {@link PinotSinkCommittable} pointing to file on shared filesystem
+     * @throws IOException
+     */
+    public PinotSinkCommittable prepareCommit() throws IOException {
+        if (dataPathOnSharedFS == null) {
+            dataPathOnSharedFS = writeToSharedFilesystem();
+        }
+        return new PinotSinkCommittable(dataPathOnSharedFS, minTimestamp, maxTimestamp);
+    }
+
+    /**
+     * Takes elements from {@link #serializedElements} and writes them to the shared filesystem.
+     *
+     * @return Path pointing to just written data on shared filesystem
+     * @throws IOException
+     */
+    private String writeToSharedFilesystem() throws IOException {
+        return fsAdapter.writeToSharedFileSystem(serializedElements);
+    }
+
+    /**
+     * Determines whether this segment can accept at least one more elements
+     *
+     * @return True if at least one more element will be accepted
+     */
+    public boolean acceptsElements() {
+        return acceptsElements;
+    }
+
+    /**
+     * Recovers a previously written state.
+     *
+     * @param _serializedElements List containing received, but not yet committed list of serialized elements.
+     * @param _minTimestamp       Minimum event timestamp of all elements
+     * @param _maxTimestamp       Maximum event timestamp of all elements
+     */
+    public void initializeState(List<String> _serializedElements, long _minTimestamp, long _maxTimestamp) {
+        if (!serializedElements.isEmpty()) {
+            throw new IllegalStateException("Cannot initialize a PinotWriterSegment that has already received elements.");
+        }
+
+        serializedElements.addAll(_serializedElements);
+        minTimestamp = _minTimestamp;
+        maxTimestamp = _maxTimestamp;
+    }
+
+    /**
+     * Snapshots the current state of an active {@link PinotWriterSegment}.
+     *
+     * @return List of elements currently stored within the {@link PinotWriterSegment}
+     */
+    public PinotSinkWriterState snapshotState() {
+        if (!acceptsElements()) {
+            throw new IllegalStateException("Snapshots can only be created of in-progress segments.");
+        }
+
+        return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+    }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/LocalFileSystemAdapter.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/LocalFileSystemAdapter.java
new file mode 100644
index 0000000..069daa3
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/LocalFileSystemAdapter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The LocalFileSystemAdapter is used when sharing files via the local filesystem.
+ * Keep in mind that using this FileSystemAdapter requires running the Flink app on a single node.
+ */
+public class LocalFileSystemAdapter implements FileSystemAdapter {
+
+    private final String tempDirPrefix;
+
+    public LocalFileSystemAdapter(String tempDirPrefix) {
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+    }
+
+    /**
+     * Writes a list of serialized elements to the local filesystem.
+     *
+     * @param elements List of serialized elements
+     * @return Path identifying the written file
+     * @throws IOException
+     */
+    @Override
+    public String writeToSharedFileSystem(List<String> elements) throws IOException {
+        File tempDir = Files.createTempDirectory(tempDirPrefix).toFile();
+        return FileSystemUtils.writeToLocalFile(elements, tempDir).getAbsolutePath();
+    }
+
+    /**
+     * Reads a previously written list of serialized elements from the local filesystem.
+     *
+     * @param path Path returned by {@link #writeToSharedFileSystem}
+     * @return List of serialized elements read from the local filesystem
+     * @throws IOException
+     */
+    @Override
+    public List<String> readFromSharedFileSystem(String path) throws IOException {
+        File dataFile = new File(path);
+        return Files.readAllLines(dataFile.toPath(), Charset.defaultCharset());
+    }
+
+    /**
+     * Deletes a file from the local filesystem
+     *
+     * @param path Path returned by {@link #writeToSharedFileSystem}
+     * @throws IOException
+     */
+    @Override
+    public void deleteFromSharedFileSystem(String path) {
+        new File(path).delete();
+    }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
new file mode 100644
index 0000000..8649eeb
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+    private static final int MAX_ROWS_PER_SEGMENT = 5;
+    private static final long STREAMING_CHECKPOINTING_INTERVAL = 50;
+    private static final int DATA_CHECKING_TIMEOUT_SECONDS = 60;
+    private static final AtomicBoolean hasFailedOnce = new AtomicBoolean(false);
+    private static CountDownLatch latch;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        super.setUp();
+        // Reset hasFailedOnce flag used during failure recovery testing before each test.
+        hasFailedOnce.set(false);
+        // Reset latch used to keep the generator streaming source up until the test is completed.
+        latch = new CountDownLatch(1);
+    }
+
+    /**
+     * Tests the BATCH execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBatchSink() throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(2);
+
+        List<String> rawData = getRawTestData(12);
+        DataStream<SingleColumnTableRow> dataStream = setupBatchDataSource(env, rawData);
+        setupSink(dataStream);
+
+        // Run
+        env.execute();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+    }
+
+    /**
+     * Tests failure recovery of the {@link PinotSink} using BATCH execution mode.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFailureRecoveryInBatchingSink() throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10));
+        env.setParallelism(2);
+
+        List<String> rawData = getRawTestData(12);
+        DataStream<SingleColumnTableRow> dataStream = setupBatchDataSource(env, rawData);
+        dataStream = setupFailingMapper(dataStream, 8);
+        setupSink(dataStream);
+
+        // Run
+        env.execute();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+    }
+
+    /**
+     * Tests the STREAMING execution of the {@link PinotSink}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStreamingSink() throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(2);
+        env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+        List<String> rawData = getRawTestData(20);
+        DataStream<SingleColumnTableRow> dataStream = setupStreamingDataSource(env, rawData);
+        setupSink(dataStream);
+
+        // Start execution of job
+        env.executeAsync();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+
+        // Generator source can now shut down
+        latch.countDown();
+    }
+
+    /**
+     * Tests failure recovery of the {@link PinotSink} using STREAMING execution mode.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFailureRecoveryInStreamingSink() throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setParallelism(1);
+        env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+        List<String> rawData = getRawTestData(20);
+        DataStream<SingleColumnTableRow> dataStream = setupFailingStreamingDataSource(env, rawData, 12);
+        setupSink(dataStream);
+
+        // Start execution of job
+        env.executeAsync();
+
+        // Check for data in Pinot
+        checkForDataInPinotWithRetry(rawData);
+
+        // Generator source can now shut down
+        latch.countDown();
+    }
+
+    /**
+     * Generates a small test dataset consisting of {@link SingleColumnTableRow}s.
+     *
+     * @return List of SingleColumnTableRow
+     */
+    private List<String> getRawTestData(int numItems) {
+        return IntStream.range(1, numItems + 1)
+                .mapToObj(num -> "ColValue" + num)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Setup the data source for STREAMING tests.
+     *
+     * @param env           Stream execution environment
+     * @param rawDataValues Data values to send
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> setupStreamingDataSource(StreamExecutionEnvironment env, List<String> rawDataValues) {
+        StreamingSource source = new StreamingSource.Builder(rawDataValues, 10).build();
+        return env.addSource(source)
+                .name("Test input");
+    }
+
+    /**
+     * Setup the data source for STREAMING tests.
+     *
+     * @param env                  Stream execution environment
+     * @param rawDataValues        Data values to send
+     * @param failOnceAtNthElement Number of elements to process before raising the exception
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> setupFailingStreamingDataSource(StreamExecutionEnvironment env, List<String> rawDataValues, int failOnceAtNthElement) {
+        StreamingSource source = new StreamingSource.Builder(rawDataValues, 10)
+                .raiseFailureOnce(failOnceAtNthElement)
+                .build();
+        return env.addSource(source)
+                .name("Test input");
+    }
+
+    /**
+     * Setup the data source for BATCH tests.
+     *
+     * @param env           Stream execution environment
+     * @param rawDataValues Data values to send
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> setupBatchDataSource(StreamExecutionEnvironment env, List<String> rawDataValues) {
+        return env.fromCollection(rawDataValues)
+                .map(value -> new SingleColumnTableRow(value, System.currentTimeMillis()))
+                .name("Test input");
+    }
+
+    /**
+     * Setup a mapper that fails when processing the nth element with n = failOnceAtNthElement.
+     *
+     * @param dataStream           Input data stream
+     * @param failOnceAtNthElement Number of elements to process before raising the exception
+     * @return resulting data stream
+     */
+    private DataStream<SingleColumnTableRow> setupFailingMapper(DataStream<SingleColumnTableRow> dataStream, int failOnceAtNthElement) {
+        AtomicInteger messageCounter = new AtomicInteger(0);
+
+        return dataStream.map(element -> {
+            if (!hasFailedOnce.get() && messageCounter.incrementAndGet() == failOnceAtNthElement) {
+                hasFailedOnce.set(true);
+                throw new Exception(String.format("Mapper was expected to fail after %d elements", failOnceAtNthElement));
+            }
+            return element;
+        });
+    }
+
+    /**
+     * Sets up a DataStream using the provided execution environment and the provided input data.
+     *
+     * @param dataStream data stream
+     */
+    private void setupSink(DataStream<SingleColumnTableRow> dataStream) {
+        String tempDirPrefix = "flink-pinot-connector-test";
+        PinotSinkSegmentNameGenerator segmentNameGenerator = new SimpleSegmentNameGenerator(getTableName(), "flink-connector");
+        FileSystemAdapter fsAdapter = new LocalFileSystemAdapter(tempDirPrefix);
+        JsonSerializer<SingleColumnTableRow> jsonSerializer = new SingleColumnTableRowSerializer();
+
+        EventTimeExtractor<SingleColumnTableRow> eventTimeExtractor = new SingleColumnTableRowEventTimeExtractor();
+
+        PinotSink<SingleColumnTableRow> sink = new PinotSink.Builder<SingleColumnTableRow>(getPinotHost(), getPinotControllerPort(), getTableName())
+                .withMaxRowsPerSegment(MAX_ROWS_PER_SEGMENT)
+                .withTempDirectoryPrefix(tempDirPrefix)
+                .withJsonSerializer(jsonSerializer)
+                .withEventTimeExtractor(eventTimeExtractor)
+                .withSegmentNameGenerator(segmentNameGenerator)
+                .withFileSystemAdapter(fsAdapter)
+                .withNumCommitThreads(2)
+                .build();
+
+        // Sink into Pinot
+        dataStream.sinkTo(sink).name("Pinot sink");
+    }
+
+    /**
+     * As Pinot might take some time to index the recently pushed segments we might need to retry
+     * the {@link #checkForDataInPinot} method multiple times. This method provides a simple wrapper
+     * using linear retry backoff delay.
+     *
+     * @param rawData Data to expect in the Pinot table
+     * @throws InterruptedException
+     */
+    private void checkForDataInPinotWithRetry(List<String> rawData) throws InterruptedException, PinotControllerApiException {
+        long endTime = System.currentTimeMillis() + 1000L * DATA_CHECKING_TIMEOUT_SECONDS;
+        // Use max 10 retries with linear retry backoff delay
+        long retryDelay = 1000L / 10 * DATA_CHECKING_TIMEOUT_SECONDS;
+        while (System.currentTimeMillis() < endTime) {
+            try {
+                checkForDataInPinot(rawData);
+                // In case of no error, we can skip further retries
+                return;
+            } catch (AssertionFailedError | PinotControllerApiException | PinotClientException e) {
+                // In case of an error retry after delay
+                Thread.sleep(retryDelay);
+            }
+        }
+
+        // Finally check for data in Pinot if retryTimeoutInSeconds was exceeded
+        checkForDataInPinot(rawData);
+    }
+
+    /**
+     * Checks whether data is present in the Pinot target table. numElementsToCheck defines the
+     * number of elements (from the head of data) to check for existence in the pinot table.
+     *
+     * @param rawData Data to expect in the Pinot table
+     * @throws AssertionFailedError        in case the assertion fails
+     * @throws PinotControllerApiException in case there aren't any rows in the Pinot table
+     */
+    private void checkForDataInPinot(List<String> rawData) throws AssertionFailedError, PinotControllerApiException, PinotClientException {
+        // Now get the result from Pinot and verify if everything is there
+        ResultSet resultSet = pinotHelper.getTableEntries(getTableName(), rawData.size() + 5);
+
+        Assertions.assertEquals(rawData.size(), resultSet.getRowCount(),
+                String.format("Expected %d elements in Pinot but saw %d", rawData.size(), resultSet.getRowCount()));
+
+        // Check output strings
+        List<String> output = IntStream.range(0, resultSet.getRowCount())
+                .mapToObj(i -> resultSet.getString(i, 0))
+                .collect(Collectors.toList());
+
+        for (String test : rawData) {
+            Assertions.assertTrue(output.contains(test), "Missing " + test);
+        }
+    }
+
+    /**
+     * EventTimeExtractor for {@link SingleColumnTableRow} used in e2e tests.
+     * Extracts the timestamp column from {@link SingleColumnTableRow}.
+     */
+    private static class SingleColumnTableRowEventTimeExtractor implements EventTimeExtractor<SingleColumnTableRow> {
+
+        @Override
+        public long getEventTime(SingleColumnTableRow element, SinkWriter.Context context) {
+            return element.getTimestamp();
+        }
+
+        @Override
+        public String getTimeColumn() {
+            return "timestamp";
+        }
+
+        @Override
+        public TimeUnit getSegmentTimeUnit() {
+            return TimeUnit.MILLISECONDS;
+        }
+    }
+
+    /**
+     * Simple source that publishes data and finally waits for {@link #latch}.
+     * By setting {@link #failOnceAtNthElement} > -1, one can define the number of elements to
+     * process before raising an exception. If configured, the exception will only be raised once.
+     */
+    private static class StreamingSource implements SourceFunction<SingleColumnTableRow>, CheckpointedFunction {
+
+        private static final int serialVersionUID = 1;
+
+        private final List<String> rawDataValues;
+        private final int sleepDurationMs;
+        private final int failOnceAtNthElement;
+
+        private int numElementsEmitted = 0;
+
+        private final AtomicBoolean waitingForNextSnapshot;
+        private final AtomicBoolean awaitedSnapshotCreated;
+
+        private ListState<Integer> state = null;
+
+        private StreamingSource(final List<String> rawDataValues, final int sleepDurationMs, int failOnceAtNthElement) {
+            this.rawDataValues = rawDataValues;
+            checkArgument(sleepDurationMs > 0);
+            this.sleepDurationMs = sleepDurationMs;
+            checkArgument(failOnceAtNthElement == -1 || failOnceAtNthElement > MAX_ROWS_PER_SEGMENT);
+            this.failOnceAtNthElement = failOnceAtNthElement;
+
+            // Initializes exception raising logic
+            this.waitingForNextSnapshot = new AtomicBoolean(false);
+            this.awaitedSnapshotCreated = new AtomicBoolean(false);
+        }
+
+        @Override
+        public void run(final SourceContext<SingleColumnTableRow> ctx) throws Exception {
+            while (numElementsEmitted < rawDataValues.size()) {
+                if (!hasFailedOnce.get() && failOnceAtNthElement == numElementsEmitted) {
+                    failAfterNextSnapshot();
+                }
+
+                synchronized (ctx.getCheckpointLock()) {
+                    SingleColumnTableRow element = new SingleColumnTableRow(
+                            rawDataValues.get(numElementsEmitted), System.currentTimeMillis());
+                    ctx.collect(element);
+                    numElementsEmitted++;
+                }
+                Thread.sleep(sleepDurationMs);
+            }
+
+            // Keep generator source up until the test was completed.
+            latch.await();
+        }
+
+        /**
+         * When {@link #failOnceAtNthElement} elements were received, we raise an exception after
+         * the next checkpoint was created. We ensure that at least one segment has been committed
+         * to Pinot by then, as we require {@link #failOnceAtNthElement} to be greater than
+         * {@link #MAX_ROWS_PER_SEGMENT} (at a parallelism of 1). This allows to check whether the
+         * snapshot creation and failure recovery in
+         * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} works properly,
+         * respecting the already committed elements and those that are stored in an active
+         * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment}. Committed
+         * elements must not be saved to the snapshot while those in an active segment must be saved
+         * to the snapshot in order to enable later-on recovery.
+         *
+         * @throws Exception
+         */
+        private void failAfterNextSnapshot() throws Exception {
+            hasFailedOnce.set(true);
+            waitingForNextSnapshot.set(true);
+
+            // Waiting for the next snapshot ensures that
+            // at least one segment has been committed to Pinot
+            while (!awaitedSnapshotCreated.get()) {
+                Thread.sleep(50);
+            }
+            throw new Exception(String.format("Source was expected to fail after %d elements", failOnceAtNthElement));
+        }
+
+        @Override
+        public void cancel() {
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            state = context.getOperatorStateStore()
+                    .getListState(new ListStateDescriptor<>("state", IntSerializer.INSTANCE));
+
+            for (Integer i : state.get()) {
+                numElementsEmitted += i;
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            state.clear();
+            state.add(numElementsEmitted);
+
+            // Notify that the awaited snapshot was been created
+            if (waitingForNextSnapshot.get()) {
+                awaitedSnapshotCreated.set(true);
+            }
+        }
+
+        static class Builder {
+            final List<String> rawDataValues;
+            final int sleepDurationMs;
+            int failOnceAtNthElement = -1;
+
+            Builder(List<String> rawDataValues, int sleepDurationMs) {
+                this.rawDataValues = rawDataValues;
+                this.sleepDurationMs = sleepDurationMs;
+            }
+
+            public Builder raiseFailureOnce(int failOnceAtNthElement) {
+                checkArgument(failOnceAtNthElement > MAX_ROWS_PER_SEGMENT,
+                        "failOnceAtNthElement (if set) is required to be larger than the number of elements per segment (MAX_ROWS_PER_SEGMENT).");
+                this.failOnceAtNthElement = failOnceAtNthElement;
+                return this;
+            }
+
+            public StreamingSource build() {
+                return new StreamingSource(rawDataValues, sleepDurationMs, failOnceAtNthElement);
+            }
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
new file mode 100644
index 0000000..a5f5021
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.util.TestLogger;
+import org.apache.pinot.spi.config.table.*;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+
+/**
+ * Base class for PinotSink e2e tests
+ */
+@Testcontainers
+public class PinotTestBase extends TestLogger {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(PinotTestBase.class);
+
+    private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0";
+    private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000;
+    private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000;
+
+    protected static TableConfig TABLE_CONFIG;
+    protected static final Schema TABLE_SCHEMA = PinotTableConfig.getTableSchema();
+    protected static PinotTestHelper pinotHelper;
+
+    /**
+     * Creates the Pinot testcontainer. We delay the start of tests until Pinot has started all
+     * internal components. This is identified through a log statement.
+     */
+    @Container
+    public static GenericContainer<?> pinot = new GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME))
+            .withCommand("QuickStart", "-type", "batch")
+            .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, PINOT_INTERNAL_CONTROLLER_PORT)
+            .waitingFor(
+                    // Wait for controller, server and broker instances to be available
+                    new HttpWaitStrategy()
+                            .forPort(PINOT_INTERNAL_CONTROLLER_PORT)
+                            .forPath("/instances")
+                            .forStatusCode(200)
+                            .forResponsePredicate(res -> {
+                                try {
+                                    JsonNode instances = JsonUtils.stringToJsonNode(res).get("instances");
+                                    // Expect 3 instances to be up and running (controller, broker and server)
+                                    return instances.size() == 3;
+                                } catch (IOException e) {
+                                    LOG.error("Error while reading json response in HttpWaitStrategy.", e);
+                                }
+                                return false;
+                            })
+                            // Allow Pinot to take up to 180s for starting up
+                            .withStartupTimeout(Duration.ofSeconds(180))
+            );
+
+    /**
+     * Creates a new instance of the {@link PinotTestHelper} using the testcontainer port mappings
+     * and creates the test table.
+     *
+     * @throws IOException
+     */
+    @BeforeEach
+    public void setUp() throws IOException {
+        TABLE_CONFIG = PinotTableConfig.getTableConfig();
+        pinotHelper = new PinotTestHelper(getPinotHost(), getPinotControllerPort(), getPinotBrokerPort());
+        pinotHelper.createTable(TABLE_CONFIG, TABLE_SCHEMA);
+    }
+
+    /**
+     * Delete the test table after each test.
+     *
+     * @throws Exception
+     */
+    @AfterEach
+    public void tearDown() throws Exception {
+        pinotHelper.deleteTable(TABLE_CONFIG, TABLE_SCHEMA);
+    }
+
+    /**
+     * Returns the current Pinot table name
+     *
+     * @return Pinot table name
+     */
+    protected String getTableName() {
+        return TABLE_CONFIG.getTableName();
+    }
+
+    /**
+     * Returns the host the Pinot container is available at
+     *
+     * @return Pinot container host
+     */
+    protected String getPinotHost() {
+        return pinot.getHost();
+    }
+
+
+    /**
+     * Returns the Pinot controller port from the container ports.
+     *
+     * @return Pinot controller port
+     */
+    protected String getPinotControllerPort() {
+        return pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString();
+    }
+
+    /**
+     * Returns the Pinot broker port from the container ports.
+     *
+     * @return Pinot broker port
+     */
+    private String getPinotBrokerPort() {
+        return pinot.getMappedPort(PINOT_INTERNAL_BROKER_PORT).toString();
+    }
+
+    /**
+     * Class defining the elements passed to the {@link PinotSink} during the tests.
+     */
+    protected static class SingleColumnTableRow {
+
+        private String _col1;
+        private Long _timestamp;
+
+        SingleColumnTableRow(@JsonProperty(value = "col1", required = true) String col1,
+                             @JsonProperty(value = "timestamp", required = true) Long timestamp) {
+            this._col1 = col1;
+            this._timestamp = timestamp;
+        }
+
+        @JsonProperty("col1")
+        public String getCol1() {
+            return this._col1;
+        }
+
+        public void setCol1(String _col1) {
+            this._col1 = _col1;
+        }
+
+        @JsonProperty("timestamp")
+        public Long getTimestamp() {
+            return this._timestamp;
+        }
+
+        public void setTimestamp(Long timestamp) {
+            this._timestamp = timestamp;
+        }
+    }
+
+    /**
+     * Serializes {@link SingleColumnTableRow} to JSON.
+     */
+    protected static class SingleColumnTableRowSerializer extends JsonSerializer<SingleColumnTableRow> {
+
+        @Override
+        public String toJson(SingleColumnTableRow element) {
+            return JsonUtils.objectToJsonNode(element).toString();
+        }
+    }
+
+    /**
+     * Pinot table configuration helpers.
+     */
+    private static class PinotTableConfig {
+
+        static final String TABLE_NAME_PREFIX = "FLTable";
+        static final String SCHEMA_NAME = "FLTableSchema";
+
+        private static SegmentsValidationAndRetentionConfig getValidationConfig() {
+            SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
+            validationConfig.setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy");
+            validationConfig.setSegmentPushType("APPEND");
+            validationConfig.setSchemaName(SCHEMA_NAME);
+            validationConfig.setReplication("1");
+            return validationConfig;
+        }
+
+        private static TenantConfig getTenantConfig() {
+            TenantConfig tenantConfig = new TenantConfig("DefaultTenant", "DefaultTenant", null);
+            return tenantConfig;
+        }
+
+        private static IndexingConfig getIndexingConfig() {
+            IndexingConfig indexingConfig = new IndexingConfig();
+            return indexingConfig;
+        }
+
+        private static TableCustomConfig getCustomConfig() {
+            TableCustomConfig customConfig = new TableCustomConfig(null);
+            return customConfig;
+        }
+
+        private static String generateTableName() {
+            // We want to use a new table name for each test in order to prevent interference
+            // with segments that were pushed in the previous test,
+            // but whose indexing by Pinot was delayed (thus, the previous test must have failed).
+            return String.format("%s_%d", TABLE_NAME_PREFIX, System.currentTimeMillis());
+        }
+
+        static TableConfig getTableConfig() {
+            return new TableConfig(
+                    generateTableName(),
+                    TableType.OFFLINE.name(),
+                    getValidationConfig(),
+                    getTenantConfig(),
+                    getIndexingConfig(),
+                    getCustomConfig(),
+                    null, null, null, null, null,
+                    null, null, null, null
+            );
+        }
+
+        static Schema getTableSchema() {
+            Schema schema = new Schema();
+            schema.setSchemaName(SCHEMA_NAME);
+            schema.addField(new DimensionFieldSpec("col1", FieldSpec.DataType.STRING, true));
+            schema.addField(new DimensionFieldSpec("timestamp", FieldSpec.DataType.STRING, true));
+            return schema;
+        }
+    }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
new file mode 100644
index 0000000..73a4403
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.pinot.client.*;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Helper class ot interact with the Pinot controller and broker in the e2e tests
+ */
+public class PinotTestHelper implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PinotTestHelper.class);
+    private final String host;
+    private final String brokerPort;
+    private final PinotControllerHttpClient httpClient;
+
+    /**
+     * @param host           Host the Pinot controller and broker are accessible at
+     * @param controllerPort The Pinot controller's external port at {@code host}
+     * @param brokerPort     A Pinot broker's external port at {@code host}
+     */
+    public PinotTestHelper(String host, String controllerPort, String brokerPort) {
+        this.host = host;
+        this.brokerPort = brokerPort;
+        this.httpClient = new PinotControllerHttpClient(host, controllerPort);
+    }
+
+    /**
+     * Adds a Pinot table schema.
+     *
+     * @param tableSchema Pinot table schema to add
+     * @throws IOException
+     */
+    private void addSchema(Schema tableSchema) throws IOException {
+        PinotControllerHttpClient.ApiResponse res = httpClient.post("/schemas", JsonUtils.objectToString(tableSchema));
+        LOG.debug("Schema add request for schema {} returned {}", tableSchema.getSchemaName(), res.responseBody);
+        if (res.statusLine.getStatusCode() != 200) {
+            throw new PinotControllerApiException(res.responseBody);
+        }
+    }
+
+    /**
+     * Deletes a Pinot table schema.
+     *
+     * @param tableSchema Pinot table schema to delete
+     * @throws IOException
+     */
+    private void deleteSchema(Schema tableSchema) throws IOException {
+        PinotControllerHttpClient.ApiResponse res = httpClient.delete(String.format("/schemas/%s", tableSchema.getSchemaName()));
+        LOG.debug("Schema delete request for schema {} returned {}", tableSchema.getSchemaName(), res.responseBody);
+        if (res.statusLine.getStatusCode() != 200) {
+            throw new PinotControllerApiException(res.responseBody);
+        }
+    }
+
+    /**
+     * Creates a Pinot table.
+     *
+     * @param tableConfig Pinot table configuration of table to create
+     * @throws IOException
+     */
+    private void addTable(TableConfig tableConfig) throws IOException {
+        PinotControllerHttpClient.ApiResponse res = httpClient.post("/tables", JsonUtils.objectToString(tableConfig));
+        LOG.debug("Table creation request for table {} returned {}", tableConfig.getTableName(), res.responseBody);
+        if (res.statusLine.getStatusCode() != 200) {
+            throw new PinotControllerApiException(res.responseBody);
+        }
+    }
+
+    /**
+     * Deletes a Pinot table with all its segments.
+     *
+     * @param tableConfig Pinot table configuration of table to delete
+     * @throws IOException
+     */
+    private void removeTable(TableConfig tableConfig) throws IOException {
+        PinotControllerHttpClient.ApiResponse res = httpClient.delete(String.format("/tables/%s", tableConfig.getTableName()));
+        LOG.debug("Table deletion request for table {} returned {}", tableConfig.getTableName(), res.responseBody);
+        if (res.statusLine.getStatusCode() != 200) {
+            throw new PinotControllerApiException(res.responseBody);
+        }
+    }
+
+    /**
+     * Creates a Pinot table by first adding a schema and then creating the actual table using the
+     * Pinot table configuration
+     *
+     * @param tableConfig Pinot table configuration
+     * @param tableSchema Pinot table schema
+     * @throws IOException
+     */
+    public void createTable(TableConfig tableConfig, Schema tableSchema) throws IOException {
+        this.addSchema(tableSchema);
+        this.addTable(tableConfig);
+    }
+
+    /**
+     * Deletes a Pinot table by first deleting the table and its segments and then deleting the
+     * table's schema.
+     *
+     * @param tableConfig Pinot table configuration
+     * @param tableSchema Pinot table schema
+     * @throws IOException
+     */
+    public void deleteTable(TableConfig tableConfig, Schema tableSchema) throws IOException {
+        this.removeTable(tableConfig);
+        this.deleteSchema(tableSchema);
+    }
+
+    /**
+     * Fetch table entries via the Pinot broker.
+     *
+     * @param tableName          Target table's name
+     * @param maxNumberOfEntries Max number of entries to fetch
+     * @return ResultSet
+     * @throws PinotControllerApiException
+     */
+    public ResultSet getTableEntries(String tableName, Integer maxNumberOfEntries) throws PinotControllerApiException {
+        Connection brokerConnection = null;
+        try {
+            String brokerHostPort = String.format("%s:%s", this.host, this.brokerPort);
+            brokerConnection = ConnectionFactory.fromHostList(brokerHostPort);
+            String query = String.format("SELECT * FROM %s LIMIT %d", tableName, maxNumberOfEntries);
+
+            Request pinotClientRequest = new Request("sql", query);
+            ResultSetGroup pinotResultSetGroup = brokerConnection.execute(pinotClientRequest);
+
+            if (pinotResultSetGroup.getResultSetCount() != 1) {
+                throw new PinotControllerApiException("Could not find any data in Pinot cluster.");
+            }
+            return pinotResultSetGroup.getResultSet(0);
+        } finally {
+            if (brokerConnection != null) {
+                brokerConnection.close();
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        httpClient.close();
+    }
+}
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/log4j.properties
new file mode 100644
index 0000000..b15f2be
--- /dev/null
+++ b/flink-connector-pinot/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=DEBUG, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/pom.xml b/pom.xml
index 7e83d85..57eb579 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
     <module>flink-connector-influxdb</module>
     <module>flink-connector-kudu</module>
     <module>flink-connector-netty</module>
+    <module>flink-connector-pinot</module>
     <module>flink-connector-redis</module>
     <module>flink-library-siddhi</module>
   </modules>