You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/03 18:59:55 UTC

[flink-connector-aws] 03/08: [FLINK-29907][Connectors/Firehose] Externalize Amazon Firehose connectors from Flink repo

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit 5993e34901152396a937cee399ced3beef602f0f
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Fri Dec 2 09:31:35 2022 +0000

    [FLINK-29907][Connectors/Firehose] Externalize Amazon Firehose connectors from Flink repo
---
 .../54da9a7d-14d2-4632-a045-1dd8fc665c8f           |   0
 .../a6cbd99c-b115-447a-8f19-43c1094db549           |   6 +
 .../archunit-violations/stored.rules               |   4 +
 flink-connector-aws-kinesis-firehose/pom.xml       | 153 ++++++++++++
 .../sink/KinesisFirehoseConfigConstants.java       |  32 +++
 .../firehose/sink/KinesisFirehoseException.java    |  54 +++++
 .../firehose/sink/KinesisFirehoseSink.java         | 135 +++++++++++
 .../firehose/sink/KinesisFirehoseSinkBuilder.java  | 164 +++++++++++++
 .../sink/KinesisFirehoseSinkElementConverter.java  | 104 ++++++++
 .../firehose/sink/KinesisFirehoseSinkWriter.java   | 264 +++++++++++++++++++++
 .../sink/KinesisFirehoseStateSerializer.java       |  52 ++++
 .../table/KinesisFirehoseConnectorOptions.java     |  43 ++++
 .../firehose/table/KinesisFirehoseDynamicSink.java | 183 ++++++++++++++
 .../table/KinesisFirehoseDynamicTableFactory.java  |  89 +++++++
 .../util/KinesisFirehoseConnectorOptionUtils.java  |  67 ++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../src/main/resources/log4j2.properties           |  25 ++
 .../architecture/TestCodeArchitectureTest.java     |  40 ++++
 .../sink/KinesisFirehoseSinkBuilderTest.java       |  81 +++++++
 .../KinesisFirehoseSinkElementConverterTest.java   |  54 +++++
 .../firehose/sink/KinesisFirehoseSinkITCase.java   | 125 ++++++++++
 .../firehose/sink/KinesisFirehoseSinkTest.java     | 132 +++++++++++
 .../sink/KinesisFirehoseSinkWriterTest.java        | 106 +++++++++
 .../sink/KinesisFirehoseStateSerializerTest.java   |  56 +++++
 .../sink/testutils/KinesisFirehoseTestUtils.java   |  86 +++++++
 .../KinesisFirehoseDynamicTableFactoryTest.java    | 157 ++++++++++++
 .../org.junit.jupiter.api.extension.Extension      |  16 ++
 .../src/test/resources/archunit.properties         |  31 +++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 flink-sql-connector-aws-kinesis-firehose/pom.xml   | 107 +++++++++
 .../src/main/resources/META-INF/NOTICE             |  48 ++++
 pom.xml                                            |   2 +
 32 files changed, 2460 insertions(+)

diff --git a/flink-connector-aws-kinesis-firehose/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f b/flink-connector-aws-kinesis-firehose/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f
new file mode 100644
index 0000000..e69de29
diff --git a/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549 b/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
new file mode 100644
index 0000000..5ad7b14
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
@@ -0,0 +1,6 @@
+org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connector-aws-kinesis-firehose/archunit-violations/stored.rules b/flink-connector-aws-kinesis-firehose/archunit-violations/stored.rules
new file mode 100644
index 0000000..cf8b667
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Tue Feb 22 12:19:26 CET 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=54da9a7d-14d2-4632-a045-1dd8fc665c8f
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=a6cbd99c-b115-447a-8f19-43c1094db549
diff --git a/flink-connector-aws-kinesis-firehose/pom.xml b/flink-connector-aws-kinesis-firehose/pom.xml
new file mode 100644
index 0000000..8fa9179
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/pom.xml
@@ -0,0 +1,153 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
+    <name>Flink : Connectors : AWS : Amazon Kinesis Data Firehose</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>firehose</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>netty-nio-client</artifactId>
+        </dependency>
+
+        <!--Table Api Dependencies-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>iam</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- ArchUit test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-architecture-tests-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
new file mode 100644
index 0000000..527f74a
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Defaults for {@link KinesisFirehoseSinkWriter}. */
+@PublicEvolving
+public class KinesisFirehoseConfigConstants {
+
+    public static final String BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT =
+            "Apache Flink %s (%s) Firehose Connector";
+
+    /** Firehose identifier for user agent prefix. */
+    public static final String FIREHOSE_CLIENT_USER_AGENT_PREFIX =
+            "aws.firehose.client.user-agent-prefix";
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
new file mode 100644
index 0000000..e76c10b
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@link RuntimeException} wrapper indicating the exception was thrown from the Kinesis Data
+ * Firehose Sink.
+ */
+@PublicEvolving
+class KinesisFirehoseException extends RuntimeException {
+
+    public KinesisFirehoseException(final String message) {
+        super(message);
+    }
+
+    public KinesisFirehoseException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * When the flag {@code failOnError} is set in {@link KinesisFirehoseSinkWriter}, this exception
+     * is raised as soon as any exception occurs when writing to KDF.
+     */
+    static class KinesisFirehoseFailFastException extends KinesisFirehoseException {
+
+        private static final String ERROR_MESSAGE =
+                "Encountered an exception while persisting records, not retrying due to {failOnError} being set.";
+
+        public KinesisFirehoseFailFastException() {
+            super(ERROR_MESSAGE);
+        }
+
+        public KinesisFirehoseFailFastException(final Throwable cause) {
+            super(ERROR_MESSAGE, cause);
+        }
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
new file mode 100644
index 0000000..6f9ed54
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+ * stream using the buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to communicate with the AWS
+ * endpoint.
+ *
+ * <p>Please see the writer implementation in {@link KinesisFirehoseSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
+
+    private final boolean failOnError;
+    private final String deliveryStreamName;
+    private final Properties firehoseClientProperties;
+
+    KinesisFirehoseSink(
+            ElementConverter<InputT, Record> elementConverter,
+            Integer maxBatchSize,
+            Integer maxInFlightRequests,
+            Integer maxBufferedRequests,
+            Long maxBatchSizeInBytes,
+            Long maxTimeInBufferMS,
+            Long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties firehoseClientProperties) {
+        super(
+                elementConverter,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.deliveryStreamName =
+                Preconditions.checkNotNull(
+                        deliveryStreamName,
+                        "The delivery stream name must not be null when initializing the KDF Sink.");
+        Preconditions.checkArgument(
+                !this.deliveryStreamName.isEmpty(),
+                "The delivery stream name must be set when initializing the KDF Sink.");
+        this.failOnError = failOnError;
+        this.firehoseClientProperties = firehoseClientProperties;
+    }
+
+    /**
+     * Create a {@link KinesisFirehoseSinkBuilder} to allow the fluent construction of a new {@code
+     * KinesisFirehoseSink}.
+     *
+     * @param <InputT> type of incoming records
+     * @return {@link KinesisFirehoseSinkBuilder}
+     */
+    public static <InputT> KinesisFirehoseSinkBuilder<InputT> builder() {
+        return new KinesisFirehoseSinkBuilder<>();
+    }
+
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<Record>> createWriter(
+            InitContext context) throws IOException {
+        return new KinesisFirehoseSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                deliveryStreamName,
+                firehoseClientProperties,
+                Collections.emptyList());
+    }
+
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<Record>> restoreWriter(
+            InitContext context, Collection<BufferedRequestState<Record>> recoveredState)
+            throws IOException {
+        return new KinesisFirehoseSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                deliveryStreamName,
+                firehoseClientProperties,
+                recoveredState);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<BufferedRequestState<Record>> getWriterStateSerializer() {
+        return new KinesisFirehoseStateSerializer();
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
new file mode 100644
index 0000000..087fd6b
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link KinesisFirehoseSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link KinesisFirehoseSink} that
+ * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name.
+ *
+ * <pre>{@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * KinesisFirehoseSink<String> kdfSink =
+ *         KinesisFirehoseSink.<String>builder()
+ *                 .setElementConverter(elementConverter)
+ *                 .setDeliveryStreamName("delivery-stream-name")
+ *                 .setMaxBatchSize(20)
+ *                 .setFirehoseClientProperties(sinkProperties)
+ *                 .setSerializationSchema(new SimpleStringSchema())
+ *                 .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 500
+ *   <li>{@code maxInFlightRequests} will be 50
+ *   <li>{@code maxBufferedRequests} will be 10000
+ *   <li>{@code maxBatchSizeInBytes} will be 4 MB i.e. {@code 4 * 1024 * 1024}
+ *   <li>{@code maxTimeInBufferMS} will be 5000ms
+ *   <li>{@code maxRecordSizeInBytes} will be 1000 KB i.e. {@code 1000 * 1024}
+ *   <li>{@code failOnError} will be false
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class KinesisFirehoseSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<InputT, Record, KinesisFirehoseSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 500;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024;
+    private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+    private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+    private Boolean failOnError;
+    private String deliveryStreamName;
+    private Properties firehoseClientProperties;
+    private SerializationSchema<InputT> serializationSchema;
+
+    KinesisFirehoseSinkBuilder() {}
+
+    /**
+     * Sets the name of the KDF delivery stream that the sink will connect to. There is no default
+     * for this parameter, therefore, this must be provided at sink creation time otherwise the
+     * build will fail.
+     *
+     * @param deliveryStreamName the name of the delivery stream
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setDeliveryStreamName(String deliveryStreamName) {
+        this.deliveryStreamName = deliveryStreamName;
+        return this;
+    }
+
+    /**
+     * Allows the user to specify a serialization schema to serialize each record to persist to
+     * Firehose.
+     *
+     * @param serializationSchema serialization schema to use
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setSerializationSchema(
+            SerializationSchema<InputT> serializationSchema) {
+        this.serializationSchema = serializationSchema;
+        return this;
+    }
+
+    /**
+     * If writing to Kinesis Data Firehose results in a partial or full failure being returned, the
+     * job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set.
+     *
+     * @param failOnError whether to fail on error
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setFailOnError(boolean failOnError) {
+        this.failOnError = failOnError;
+        return this;
+    }
+
+    /**
+     * A set of properties used by the sink to create the firehose client. This may be used to set
+     * the aws region, credentials etc. See the docs for usage and syntax.
+     *
+     * @param firehoseClientProperties Firehose client properties
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setFirehoseClientProperties(
+            Properties firehoseClientProperties) {
+        this.firehoseClientProperties = firehoseClientProperties;
+        return this;
+    }
+
+    @VisibleForTesting
+    Properties getClientPropertiesWithDefaultHttpProtocol() {
+        Properties clientProperties =
+                Optional.ofNullable(firehoseClientProperties).orElse(new Properties());
+        clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL.toString());
+        return clientProperties;
+    }
+
+    @Override
+    public KinesisFirehoseSink<InputT> build() {
+        return new KinesisFirehoseSink<>(
+                KinesisFirehoseSinkElementConverter.<InputT>builder()
+                        .setSerializationSchema(serializationSchema)
+                        .build(),
+                Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+                Optional.ofNullable(getMaxInFlightRequests())
+                        .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+                Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+                Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+                Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+                Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+                Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
+                deliveryStreamName,
+                getClientPropertiesWithDefaultHttpProtocol());
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
new file mode 100644
index 0000000..b90db33
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS Kinesis SDK v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a
+ * {@link Record} that may be persisted.
+ */
+@Internal
+public class KinesisFirehoseSinkElementConverter<InputT>
+        implements ElementConverter<InputT, Record> {
+    private boolean schemaOpened = false;
+
+    /** A serialization schema to specify how the input element should be serialized. */
+    private final SerializationSchema<InputT> serializationSchema;
+
+    private KinesisFirehoseSinkElementConverter(SerializationSchema<InputT> serializationSchema) {
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public Record apply(InputT element, SinkWriter.Context context) {
+        checkOpened();
+        return Record.builder()
+                .data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
+                .build();
+    }
+
+    private void checkOpened() {
+        if (!schemaOpened) {
+            try {
+                serializationSchema.open(
+                        new SerializationSchema.InitializationContext() {
+                            @Override
+                            public MetricGroup getMetricGroup() {
+                                return new UnregisteredMetricsGroup();
+                            }
+
+                            @Override
+                            public UserCodeClassLoader getUserCodeClassLoader() {
+                                return SimpleUserCodeClassLoader.create(
+                                        KinesisFirehoseSinkElementConverter.class.getClassLoader());
+                            }
+                        });
+                schemaOpened = true;
+            } catch (Exception e) {
+                throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
+            }
+        }
+    }
+
+    public static <InputT> Builder<InputT> builder() {
+        return new Builder<>();
+    }
+
+    /** A builder for the KinesisFirehoseSinkElementConverter. */
+    public static class Builder<InputT> {
+
+        private SerializationSchema<InputT> serializationSchema;
+
+        public Builder<InputT> setSerializationSchema(
+                SerializationSchema<InputT> serializationSchema) {
+            this.serializationSchema = serializationSchema;
+            return this;
+        }
+
+        public KinesisFirehoseSinkElementConverter<InputT> build() {
+            Preconditions.checkNotNull(
+                    serializationSchema,
+                    "No SerializationSchema was supplied to the " + "KinesisFirehoseSink builder.");
+            return new KinesisFirehoseSinkElementConverter<>(serializationSchema);
+        }
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
new file mode 100644
index 0000000..c16018d
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link KinesisFirehoseSink} to write to Kinesis Data Firehose. More
+ * details on the operation of this sink writer may be found in the doc for {@link
+ * KinesisFirehoseSink}. More details on the internals of this sink writer may be found in {@link
+ * AsyncSinkWriter}.
+ *
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkWriter.class);
+
+    private static SdkAsyncHttpClient createHttpClient(Properties firehoseClientProperties) {
+        return AWSGeneralUtil.createAsyncHttpClient(firehoseClientProperties);
+    }
+
+    private static FirehoseAsyncClient createFirehoseClient(
+            Properties firehoseClientProperties, SdkAsyncHttpClient httpClient) {
+        AWSGeneralUtil.validateAwsCredentials(firehoseClientProperties);
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                firehoseClientProperties,
+                httpClient,
+                FirehoseAsyncClient.builder(),
+                KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT,
+                KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
+    }
+
+    private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER =
+            FatalExceptionClassifier.withRootCauseOfType(
+                    ResourceNotFoundException.class,
+                    err ->
+                            new KinesisFirehoseException(
+                                    "Encountered non-recoverable exception relating to not being able to find the specified resources",
+                                    err));
+
+    private static final FatalExceptionClassifier FIREHOSE_FATAL_EXCEPTION_CLASSIFIER =
+            FatalExceptionClassifier.createChain(
+                    getInterruptedExceptionClassifier(),
+                    getInvalidCredentialsExceptionClassifier(),
+                    RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
+                    getSdkClientMisconfiguredExceptionClassifier());
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* Name of the delivery stream in Kinesis Data Firehose */
+    private final String deliveryStreamName;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    /* The asynchronous http client */
+    private final SdkAsyncHttpClient httpClient;
+
+    /* The asynchronous Firehose client */
+    private final FirehoseAsyncClient firehoseClient;
+
+    /* Flag to whether fatally fail any time we encounter an exception when persisting records */
+    private final boolean failOnError;
+
+    KinesisFirehoseSinkWriter(
+            ElementConverter<InputT, Record> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties firehoseClientProperties) {
+        this(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes,
+                failOnError,
+                deliveryStreamName,
+                firehoseClientProperties,
+                Collections.emptyList());
+    }
+
+    KinesisFirehoseSinkWriter(
+            ElementConverter<InputT, Record> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties firehoseClientProperties,
+            Collection<BufferedRequestState<Record>> initialStates) {
+        super(
+                elementConverter,
+                context,
+                AsyncSinkWriterConfiguration.builder()
+                        .setMaxBatchSize(maxBatchSize)
+                        .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+                        .setMaxInFlightRequests(maxInFlightRequests)
+                        .setMaxBufferedRequests(maxBufferedRequests)
+                        .setMaxTimeInBufferMS(maxTimeInBufferMS)
+                        .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .build(),
+                initialStates);
+        this.failOnError = failOnError;
+        this.deliveryStreamName = deliveryStreamName;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
+        this.httpClient = createHttpClient(firehoseClientProperties);
+        this.firehoseClient = createFirehoseClient(firehoseClientProperties, httpClient);
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<Record> requestEntries, Consumer<List<Record>> requestResult) {
+
+        PutRecordBatchRequest batchRequest =
+                PutRecordBatchRequest.builder()
+                        .records(requestEntries)
+                        .deliveryStreamName(deliveryStreamName)
+                        .build();
+
+        CompletableFuture<PutRecordBatchResponse> future =
+                firehoseClient.putRecordBatch(batchRequest);
+
+        future.whenComplete(
+                (response, err) -> {
+                    if (err != null) {
+                        handleFullyFailedRequest(err, requestEntries, requestResult);
+                    } else if (response.failedPutCount() > 0) {
+                        handlePartiallyFailedRequest(response, requestEntries, requestResult);
+                    } else {
+                        requestResult.accept(Collections.emptyList());
+                    }
+                });
+    }
+
+    @Override
+    protected long getSizeInBytes(Record requestEntry) {
+        return requestEntry.data().asByteArrayUnsafe().length;
+    }
+
+    @Override
+    public void close() {
+        AWSGeneralUtil.closeResources(httpClient, firehoseClient);
+    }
+
+    private void handleFullyFailedRequest(
+            Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
+        LOG.debug(
+                "KDF Sink failed to write and will retry {} entries to KDF first request was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString(),
+                err);
+        numRecordsOutErrorsCounter.inc(requestEntries.size());
+
+        if (isRetryable(err)) {
+            requestResult.accept(requestEntries);
+        }
+    }
+
+    private void handlePartiallyFailedRequest(
+            PutRecordBatchResponse response,
+            List<Record> requestEntries,
+            Consumer<List<Record>> requestResult) {
+        LOG.debug(
+                "KDF Sink failed to write and will retry {} entries to KDF first request was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString());
+        numRecordsOutErrorsCounter.inc(response.failedPutCount());
+
+        if (failOnError) {
+            getFatalExceptionCons()
+                    .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException());
+            return;
+        }
+        List<Record> failedRequestEntries = new ArrayList<>(response.failedPutCount());
+        List<PutRecordBatchResponseEntry> records = response.requestResponses();
+
+        for (int i = 0; i < records.size(); i++) {
+            if (records.get(i).errorCode() != null) {
+                failedRequestEntries.add(requestEntries.get(i));
+            }
+        }
+
+        requestResult.accept(failedRequestEntries);
+    }
+
+    private boolean isRetryable(Throwable err) {
+        if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
+            return false;
+        }
+        if (failOnError) {
+            getFatalExceptionCons()
+                    .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err));
+            return false;
+        }
+
+        return true;
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java
new file mode 100644
index 0000000..36162e6
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Firehose implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class KinesisFirehoseStateSerializer extends AsyncSinkWriterStateSerializer<Record> {
+    @Override
+    protected void serializeRequestToStream(Record request, DataOutputStream out)
+            throws IOException {
+        out.write(request.data().asByteArrayUnsafe());
+    }
+
+    @Override
+    protected Record deserializeRequestFromStream(long requestSize, DataInputStream in)
+            throws IOException {
+        byte[] requestData = new byte[(int) requestSize];
+        in.read(requestData);
+        return Record.builder().data(SdkBytes.fromByteArray(requestData)).build();
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java
new file mode 100644
index 0000000..92773c6
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
+
+/** Options for the Kinesis firehose connector. */
+@PublicEvolving
+public class KinesisFirehoseConnectorOptions extends AsyncSinkConnectorOptions {
+
+    public static final ConfigOption<String> DELIVERY_STREAM =
+            ConfigOptions.key("delivery-stream")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the Kinesis Firehose delivery stream backing this table.");
+
+    public static final ConfigOption<Boolean> SINK_FAIL_ON_ERROR =
+            ConfigOptions.key("sink.fail-on-error")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional fail on error value for kinesis Firehose sink, default is false");
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
new file mode 100644
index 0000000..149dbaf
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/** Kinesis firehose backed {@link AsyncDynamicTableSink}. */
+@Internal
+public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> {
+
+    /** Consumed data type of the table. */
+    private final DataType consumedDataType;
+
+    /** The Firehose delivery stream to write to. */
+    private final String deliveryStream;
+
+    /** Properties for the Firehose DataStream Sink. */
+    private final Properties firehoseClientProperties;
+
+    /** Sink format for encoding records to Kinesis. */
+    private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
+
+    private final Boolean failOnError;
+
+    protected KinesisFirehoseDynamicSink(
+            @Nullable Integer maxBatchSize,
+            @Nullable Integer maxInFlightRequests,
+            @Nullable Integer maxBufferedRequests,
+            @Nullable Long maxBufferSizeInBytes,
+            @Nullable Long maxTimeInBufferMS,
+            @Nullable Boolean failOnError,
+            @Nullable DataType consumedDataType,
+            String deliveryStream,
+            @Nullable Properties firehoseClientProperties,
+            EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+        super(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS);
+        this.failOnError = failOnError;
+        this.firehoseClientProperties = firehoseClientProperties;
+        this.consumedDataType =
+                Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null");
+        this.deliveryStream =
+                Preconditions.checkNotNull(
+                        deliveryStream, "Firehose Delivery stream name must not be null");
+        this.encodingFormat =
+                Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null");
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return encodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        SerializationSchema<RowData> serializationSchema =
+                encodingFormat.createRuntimeEncoder(context, consumedDataType);
+
+        KinesisFirehoseSinkBuilder<RowData> builder =
+                KinesisFirehoseSink.<RowData>builder()
+                        .setSerializationSchema(serializationSchema)
+                        .setFirehoseClientProperties(firehoseClientProperties)
+                        .setDeliveryStreamName(deliveryStream);
+
+        Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
+        super.addAsyncOptionsToSinkBuilder(builder);
+
+        return SinkV2Provider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new KinesisFirehoseDynamicSink(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS,
+                failOnError,
+                consumedDataType,
+                deliveryStream,
+                firehoseClientProperties,
+                encodingFormat);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "firehose";
+    }
+
+    /** Builder class for {@link KinesisFirehoseDynamicSink}. */
+    @Internal
+    public static class KinesisFirehoseDynamicSinkBuilder
+            extends AsyncDynamicTableSinkBuilder<Record, KinesisFirehoseDynamicSinkBuilder> {
+
+        private DataType consumedDataType = null;
+        private String deliveryStream = null;
+        private Properties firehoseClientProperties = null;
+        private EncodingFormat<SerializationSchema<RowData>> encodingFormat = null;
+        private Boolean failOnError = null;
+
+        public KinesisFirehoseDynamicSinkBuilder setConsumedDataType(DataType consumedDataType) {
+            this.consumedDataType = consumedDataType;
+            return this;
+        }
+
+        public KinesisFirehoseDynamicSinkBuilder setDeliveryStream(String deliveryStream) {
+            this.deliveryStream = deliveryStream;
+            return this;
+        }
+
+        public KinesisFirehoseDynamicSinkBuilder setFirehoseClientProperties(
+                Properties firehoseClientProperties) {
+            this.firehoseClientProperties = firehoseClientProperties;
+            return this;
+        }
+
+        public KinesisFirehoseDynamicSinkBuilder setEncodingFormat(
+                EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+            this.encodingFormat = encodingFormat;
+            return this;
+        }
+
+        public KinesisFirehoseDynamicSinkBuilder setFailOnError(Boolean failOnError) {
+            this.failOnError = failOnError;
+            return this;
+        }
+
+        @Override
+        public KinesisFirehoseDynamicSink build() {
+            return new KinesisFirehoseDynamicSink(
+                    getMaxBatchSize(),
+                    getMaxInFlightRequests(),
+                    getMaxBufferedRequests(),
+                    getMaxBufferSizeInBytes(),
+                    getMaxTimeInBufferMS(),
+                    failOnError,
+                    consumedDataType,
+                    deliveryStream,
+                    firehoseClientProperties,
+                    encodingFormat);
+        }
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
new file mode 100644
index 0000000..a7ca38e
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
+import org.apache.flink.connector.firehose.table.util.KinesisFirehoseConnectorOptionUtils;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.DELIVERY_STREAM;
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.SINK_FAIL_ON_ERROR;
+import static org.apache.flink.connector.firehose.table.util.KinesisFirehoseConnectorOptionUtils.FIREHOSE_CLIENT_PROPERTIES_KEY;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/** Factory for creating {@link KinesisFirehoseDynamicSink} . */
+@Internal
+public class KinesisFirehoseDynamicTableFactory extends AsyncDynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "firehose";
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+
+        AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);
+
+        KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder builder =
+                new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder();
+
+        KinesisFirehoseConnectorOptionUtils optionsUtils =
+                new KinesisFirehoseConnectorOptionUtils(
+                        factoryContext.getResolvedOptions(), factoryContext.getTableOptions());
+        // validate the data types of the table options
+        factoryContext
+                .getFactoryHelper()
+                .validateExcept(optionsUtils.getNonValidatedPrefixes().toArray(new String[0]));
+        Properties properties = optionsUtils.getSinkProperties();
+
+        builder.setDeliveryStream((String) properties.get(DELIVERY_STREAM.key()))
+                .setFirehoseClientProperties(
+                        (Properties) properties.get(FIREHOSE_CLIENT_PROPERTIES_KEY))
+                .setEncodingFormat(factoryContext.getEncodingFormat())
+                .setConsumedDataType(factoryContext.getPhysicalDataType());
+        Optional.ofNullable((Boolean) properties.get(SINK_FAIL_ON_ERROR.key()))
+                .ifPresent(builder::setFailOnError);
+        return super.addAsyncOptionsToBuilder(properties, builder).build();
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DELIVERY_STREAM);
+        options.add(FORMAT);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = super.optionalOptions();
+        options.add(SINK_FAIL_ON_ERROR);
+        return options;
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java
new file mode 100644
index 0000000..bd1ccfb
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.table.util.AsyncClientOptionsUtils;
+import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.DELIVERY_STREAM;
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.SINK_FAIL_ON_ERROR;
+
+/** Class for extracting firehose configurations from table options. */
+@Internal
+public class KinesisFirehoseConnectorOptionUtils {
+
+    public static final String FIREHOSE_CLIENT_PROPERTIES_KEY = "sink.client.properties";
+
+    private final AsyncSinkConfigurationValidator asyncSinkConfigurationValidator;
+    private final AsyncClientOptionsUtils asyncClientOptionsUtils;
+    private final Map<String, String> resolvedOptions;
+    private final ReadableConfig tableOptions;
+
+    public KinesisFirehoseConnectorOptionUtils(
+            Map<String, String> resolvedOptions, ReadableConfig tableOptions) {
+        this.asyncSinkConfigurationValidator = new AsyncSinkConfigurationValidator(tableOptions);
+        this.asyncClientOptionsUtils = new AsyncClientOptionsUtils(resolvedOptions);
+        this.resolvedOptions = resolvedOptions;
+        this.tableOptions = tableOptions;
+    }
+
+    public List<String> getNonValidatedPrefixes() {
+        return this.asyncClientOptionsUtils.getNonValidatedPrefixes();
+    }
+
+    public Properties getSinkProperties() {
+        Properties properties = asyncSinkConfigurationValidator.getValidatedConfigurations();
+        properties.put(DELIVERY_STREAM.key(), tableOptions.get(DELIVERY_STREAM));
+        Properties kinesisClientProps = asyncClientOptionsUtils.getValidatedConfigurations();
+        properties.put(FIREHOSE_CLIENT_PROPERTIES_KEY, kinesisClientProps);
+        if (tableOptions.getOptional(SINK_FAIL_ON_ERROR).isPresent()) {
+            properties.put(
+                    SINK_FAIL_ON_ERROR.key(), tableOptions.getOptional(SINK_FAIL_ON_ERROR).get());
+        }
+        return properties;
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-aws-kinesis-firehose/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..2147c30
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.firehose.table.KinesisFirehoseDynamicTableFactory
diff --git a/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties b/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 0000000..d7fbc74
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+        packages = "org.apache.flink.connector.firehose",
+        importOptions = {
+            ImportOption.OnlyIncludeTests.class,
+            ImportOptions.ExcludeScalaImportOption.class,
+            ImportOptions.ExcludeShadedImportOption.class
+        })
+public class TestCodeArchitectureTest {
+
+    @ArchTest
+    public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
new file mode 100644
index 0000000..e375193
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
+class KinesisFirehoseSinkBuilderTest {
+
+    private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
+            new SimpleStringSchema();
+
+    @Test
+    void elementConverterOfSinkMustBeSetWhenBuilt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.builder()
+                                        .setDeliveryStreamName("deliveryStream")
+                                        .build())
+                .withMessageContaining(
+                        "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
+    }
+
+    @Test
+    void streamNameOfSinkMustBeSetWhenBuilt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.<String>builder()
+                                        .setSerializationSchema(SERIALIZATION_SCHEMA)
+                                        .build())
+                .withMessageContaining(
+                        "The delivery stream name must not be null when initializing the KDF Sink.");
+    }
+
+    @Test
+    void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.<String>builder()
+                                        .setDeliveryStreamName("")
+                                        .setSerializationSchema(SERIALIZATION_SCHEMA)
+                                        .build())
+                .withMessageContaining(
+                        "The delivery stream name must be set when initializing the KDF Sink.");
+    }
+
+    @Test
+    void defaultProtocolVersionInsertedToConfiguration() {
+        Properties expectedProps = new Properties();
+        expectedProps.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
+        Properties defaultProperties =
+                KinesisFirehoseSink.<String>builder().getClientPropertiesWithDefaultHttpProtocol();
+
+        Assertions.assertThat(defaultProperties).isEqualTo(expectedProps);
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
new file mode 100644
index 0000000..ccb4582
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link KinesisFirehoseSinkElementConverter}. */
+class KinesisFirehoseSinkElementConverterTest {
+
+    @Test
+    void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build())
+                .withMessageContaining(
+                        "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
+    }
+
+    @Test
+    void elementConverterUsesProvidedSchemaToSerializeRecord() {
+        ElementConverter<String, Record> elementConverter =
+                KinesisFirehoseSinkElementConverter.<String>builder()
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .build();
+
+        String testString = "{many hands make light work;";
+
+        Record serializedRecord = elementConverter.apply(testString, null);
+        byte[] serializedString = (new SimpleStringSchema()).serialize(testString);
+        assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString));
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
new file mode 100644
index 0000000..2e4105a
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.util.List;
+
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIamClient;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createFirehoseClient;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
+@Testcontainers
+class KinesisFirehoseSinkITCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
+    private static final String ROLE_NAME = "super-role";
+    private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME;
+    private static final String BUCKET_NAME = "s3-firehose";
+    private static final String STREAM_NAME = "s3-stream";
+    private static final int NUMBER_OF_ELEMENTS = 92;
+    private StreamExecutionEnvironment env;
+
+    private SdkHttpClient httpClient;
+    private S3Client s3Client;
+    private FirehoseClient firehoseClient;
+    private IamClient iamClient;
+
+    @Container
+    private static LocalstackContainer mockFirehoseContainer =
+            new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
+
+    @BeforeEach
+    void setup() {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+        httpClient = AWSServicesTestUtils.createHttpClient();
+        s3Client = createS3Client(mockFirehoseContainer.getEndpoint(), httpClient);
+        firehoseClient = createFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient);
+        iamClient = createIamClient(mockFirehoseContainer.getEndpoint(), httpClient);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    @AfterEach
+    void teardown() {
+        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+    }
+
+    @Test
+    void firehoseSinkWritesCorrectDataToMockAWSServices() throws Exception {
+        LOG.info("1 - Creating the bucket for Firehose to deliver into...");
+        createBucket(s3Client, BUCKET_NAME);
+        LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket...");
+        createIAMRole(iamClient, ROLE_NAME);
+        LOG.info("3 - Creating the Firehose delivery stream...");
+        createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, firehoseClient);
+
+        KinesisFirehoseSink<String> kdsSink =
+                KinesisFirehoseSink.<String>builder()
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .setDeliveryStreamName(STREAM_NAME)
+                        .setMaxBatchSize(1)
+                        .setFirehoseClientProperties(
+                                createConfig(mockFirehoseContainer.getEndpoint()))
+                        .build();
+
+        KinesisFirehoseTestUtils.getSampleDataGenerator(env, NUMBER_OF_ELEMENTS).sinkTo(kdsSink);
+        env.execute("Integration Test");
+
+        List<S3Object> objects =
+                listBucketObjects(
+                        createS3Client(mockFirehoseContainer.getEndpoint(), httpClient),
+                        BUCKET_NAME);
+        assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
+        assertThat(
+                        readObjectsFromS3Bucket(
+                                s3Client,
+                                objects,
+                                BUCKET_NAME,
+                                response -> new String(response.asByteArrayUnsafe())))
+                .containsAll(KinesisFirehoseTestUtils.getSampleData(NUMBER_OF_ELEMENTS));
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
new file mode 100644
index 0000000..40e9ace
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSink}. */
+class KinesisFirehoseSinkTest {
+
+    private static final ElementConverter<String, Record> elementConverter =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    void deliveryStreamNameMustNotBeNull() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisFirehoseSink<>(
+                                        elementConverter,
+                                        500,
+                                        16,
+                                        10000,
+                                        4 * 1024 * 1024L,
+                                        5000L,
+                                        1000 * 1024L,
+                                        false,
+                                        null,
+                                        new Properties()))
+                .withMessageContaining(
+                        "The delivery stream name must not be null when initializing the KDF Sink.");
+    }
+
+    @Test
+    void deliveryStreamNameMustNotBeEmpty() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisFirehoseSink<>(
+                                        elementConverter,
+                                        500,
+                                        16,
+                                        10000,
+                                        4 * 1024 * 1024L,
+                                        5000L,
+                                        1000 * 1024L,
+                                        false,
+                                        "",
+                                        new Properties()))
+                .withMessageContaining(
+                        "The delivery stream name must be set when initializing the KDF Sink.");
+    }
+
+    @Test
+    void firehoseSinkFailsWhenAccessKeyIdIsNotProvided() {
+        Properties properties = createConfig("https://non-exisitent-location");
+        properties.setProperty(
+                AWS_CREDENTIALS_PROVIDER, AWSConfigConstants.CredentialProvider.BASIC.toString());
+        properties.remove(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER));
+        firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+                properties, "Please set values for AWS Access Key ID");
+    }
+
+    @Test
+    void firehoseSinkFailsWhenRegionIsNotProvided() {
+        Properties properties = createConfig("https://non-exisitent-location");
+        properties.remove(AWS_REGION);
+        firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+                properties, "region must not be null.");
+    }
+
+    @Test
+    void firehoseSinkFailsWhenUnableToConnectToRemoteService() {
+        Properties properties = createConfig("https://non-exisitent-location");
+        properties.remove(TRUST_ALL_CERTIFICATES);
+        firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+                properties,
+                "Received an UnknownHostException when attempting to interact with a service.");
+    }
+
+    private void firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+            Properties properties, String errorMessage) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        KinesisFirehoseSink<String> kdsSink =
+                KinesisFirehoseSink.<String>builder()
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .setDeliveryStreamName("non-existent-stream")
+                        .setMaxBatchSize(1)
+                        .setFirehoseClientProperties(properties)
+                        .build();
+
+        KinesisFirehoseTestUtils.getSampleDataGenerator(env, 10).sinkTo(kdsSink);
+
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(() -> env.execute("Integration Test"))
+                .havingCause()
+                .havingCause()
+                .withMessageContaining(errorMessage);
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
new file mode 100644
index 0000000..29160f7
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.concurrent.CompletionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkWriter}. */
+public class KinesisFirehoseSinkWriterTest {
+
+    private KinesisFirehoseSinkWriter<String> sinkWriter;
+
+    private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @BeforeEach
+    void setup() {
+        TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+        Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
+        sinkWriter =
+                new KinesisFirehoseSinkWriter<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        sinkInitContext,
+                        50,
+                        16,
+                        10000,
+                        4 * 1024 * 1024,
+                        5000,
+                        1000 * 1024,
+                        true,
+                        "streamName",
+                        sinkProperties);
+    }
+
+    @Test
+    void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
+        String testString = "{many hands make light work;";
+        Record record = Record.builder().data(SdkBytes.fromUtf8String(testString)).build();
+        assertThat(sinkWriter.getSizeInBytes(record))
+                .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
+    }
+
+    @Test
+    void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
+            throws IOException, InterruptedException {
+        TestSinkInitContext ctx = new TestSinkInitContext();
+        KinesisFirehoseSink<String> kinesisFirehoseSink =
+                new KinesisFirehoseSink<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        12,
+                        16,
+                        10000,
+                        4 * 1024 * 1024L,
+                        5000L,
+                        1000 * 1024L,
+                        true,
+                        "test-stream",
+                        AWSServicesTestUtils.createConfig("https://localhost"));
+        SinkWriter<String> writer = kinesisFirehoseSink.createWriter(ctx);
+
+        for (int i = 0; i < 12; i++) {
+            writer.write("data_bytes", null);
+        }
+        assertThatExceptionOfType(CompletionException.class)
+                .isThrownBy(() -> writer.flush(true))
+                .withCauseInstanceOf(SdkClientException.class)
+                .withMessageContaining(
+                        "Unable to execute HTTP request: Connection refused: localhost/127.0.0.1:443");
+        assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
+        assertThat(ctx.metricGroup().getNumRecordsSendErrorsCounter().getCount()).isEqualTo(12);
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializerTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializerTest.java
new file mode 100644
index 0000000..5264e97
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
+
+/** Test class for {@link KinesisFirehoseStateSerializer}. */
+class KinesisFirehoseStateSerializerTest {
+
+    private static final ElementConverter<String, Record> ELEMENT_CONVERTER =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    void testSerializeAndDeserialize() throws IOException {
+        BufferedRequestState<Record> expectedState =
+                getTestState(ELEMENT_CONVERTER, this::getRequestSize);
+
+        KinesisFirehoseStateSerializer serializer = new KinesisFirehoseStateSerializer();
+        BufferedRequestState<Record> actualState =
+                serializer.deserialize(1, serializer.serialize(expectedState));
+
+        assertThatBufferStatesAreEqual(actualState, expectedState);
+    }
+
+    private int getRequestSize(Record requestEntry) {
+        return requestEntry.data().asByteArrayUnsafe().length;
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
new file mode 100644
index 0000000..f4dee62
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
+import software.amazon.awssdk.services.firehose.model.ExtendedS3DestinationConfiguration;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the Localstack container.
+ */
+public class KinesisFirehoseTestUtils {
+
+    private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
+
+    public static FirehoseClient createFirehoseClient(String endpoint, SdkHttpClient httpClient) {
+        return AWSServicesTestUtils.createAwsSyncClient(
+                endpoint, httpClient, FirehoseClient.builder());
+    }
+
+    public static void createDeliveryStream(
+            String deliveryStreamName,
+            String bucketName,
+            String roleARN,
+            FirehoseClient firehoseClient) {
+        ExtendedS3DestinationConfiguration s3Config =
+                ExtendedS3DestinationConfiguration.builder()
+                        .bucketARN(bucketName)
+                        .roleARN(roleARN)
+                        .build();
+        CreateDeliveryStreamRequest request =
+                CreateDeliveryStreamRequest.builder()
+                        .deliveryStreamName(deliveryStreamName)
+                        .extendedS3DestinationConfiguration(s3Config)
+                        .deliveryStreamType(DeliveryStreamType.DIRECT_PUT)
+                        .build();
+
+        firehoseClient.createDeliveryStream(request);
+    }
+
+    public static DataStream<String> getSampleDataGenerator(
+            StreamExecutionEnvironment env, int endValue) {
+        return env.fromSequence(1, endValue)
+                .map(Object::toString)
+                .returns(String.class)
+                .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data)));
+    }
+
+    public static List<String> getSampleData(int endValue) throws JsonProcessingException {
+        List<String> expectedElements = new ArrayList<>();
+        for (int i = 1; i <= endValue; i++) {
+            expectedElements.add(
+                    MAPPER.writeValueAsString(ImmutableMap.of("data", String.valueOf(i))));
+        }
+        return expectedElements;
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
new file mode 100644
index 0000000..532b3b6
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableOptionsBuilder;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+
+/**
+ * Test for {@link KinesisFirehoseDynamicSink} created by {@link
+ * KinesisFirehoseDynamicTableFactory}.
+ */
+class KinesisFirehoseDynamicTableFactoryTest {
+    private static final String DELIVERY_STREAM_NAME = "myDeliveryStream";
+
+    @Test
+    void testGoodTableSink() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> sinkOptions = defaultTableOptions().build();
+
+        // Construct actual DynamicTableSink using FactoryUtil
+        KinesisFirehoseDynamicSink actualSink =
+                (KinesisFirehoseDynamicSink) createTableSink(sinkSchema, sinkOptions);
+
+        // Construct expected DynamicTableSink using factory under test
+        KinesisFirehoseDynamicSink expectedSink =
+                new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
+                        .setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+                        .setDeliveryStream(DELIVERY_STREAM_NAME)
+                        .setFirehoseClientProperties(defaultSinkProperties())
+                        .setEncodingFormat(new TestFormatFactory.EncodingFormatMock(","))
+                        .build();
+
+        Assertions.assertThat(actualSink).isEqualTo(expectedSink);
+
+        // verify the produced sink
+        DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+                actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
+        Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
+    }
+
+    @Test
+    void testGoodTableSinkWithSinkOptions() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> sinkOptions = defaultTableOptionsWithSinkOptions().build();
+
+        // Construct actual DynamicTableSink using FactoryUtil
+        KinesisFirehoseDynamicSink actualSink =
+                (KinesisFirehoseDynamicSink) createTableSink(sinkSchema, sinkOptions);
+
+        // Construct expected DynamicTableSink using factory under test
+        KinesisFirehoseDynamicSink expectedSink =
+                getDefaultSinkBuilder()
+                        .setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+                        .setDeliveryStream(DELIVERY_STREAM_NAME)
+                        .setFirehoseClientProperties(defaultSinkProperties())
+                        .setEncodingFormat(new TestFormatFactory.EncodingFormatMock(","))
+                        .build();
+
+        Assertions.assertThat(actualSink).isEqualTo(expectedSink);
+
+        // verify the produced sink
+        DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+                actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
+        Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
+    }
+
+    private ResolvedSchema defaultSinkSchema() {
+        return ResolvedSchema.of(
+                Column.physical("name", DataTypes.STRING()),
+                Column.physical("curr_id", DataTypes.BIGINT()),
+                Column.physical("time", DataTypes.TIMESTAMP(3)));
+    }
+
+    private TableOptionsBuilder defaultTableOptionsWithSinkOptions() {
+        return defaultTableOptions()
+                .withTableOption("sink.fail-on-error", "true")
+                .withTableOption("sink.batch.max-size", "100")
+                .withTableOption("sink.requests.max-inflight", "100")
+                .withTableOption("sink.requests.max-buffered", "100")
+                .withTableOption("sink.flush-buffer.size", "1000")
+                .withTableOption("sink.flush-buffer.timeout", "1000");
+    }
+
+    private TableOptionsBuilder defaultTableOptions() {
+        String connector = KinesisFirehoseDynamicTableFactory.IDENTIFIER;
+        String format = TestFormatFactory.IDENTIFIER;
+        return new TableOptionsBuilder(connector, format)
+                // default table options
+                .withTableOption(
+                        KinesisFirehoseConnectorOptions.DELIVERY_STREAM, DELIVERY_STREAM_NAME)
+                .withTableOption("aws.region", "us-west-2")
+                .withTableOption("aws.credentials.provider", "BASIC")
+                .withTableOption("aws.credentials.basic.accesskeyid", "ververicka")
+                .withTableOption(
+                        "aws.credentials.basic.secretkey",
+                        "SuperSecretSecretSquirrel") // default format options
+                .withFormatOption(TestFormatFactory.DELIMITER, ",")
+                .withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
+    }
+
+    private KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder getDefaultSinkBuilder() {
+        return new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
+                .setFailOnError(true)
+                .setMaxBatchSize(100)
+                .setMaxInFlightRequests(100)
+                .setMaxBufferSizeInBytes(1000)
+                .setMaxBufferedRequests(100)
+                .setMaxTimeInBufferMS(1000);
+    }
+
+    private Properties defaultSinkProperties() {
+        return new Properties() {
+            {
+                setProperty("aws.region", "us-west-2");
+                setProperty("aws.credentials.provider", "BASIC");
+                setProperty("aws.credentials.provider.basic.accesskeyid", "ververicka");
+                setProperty(
+                        "aws.credentials.provider.basic.secretkey", "SuperSecretSecretSquirrel");
+            }
+        };
+    }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-connector-aws-kinesis-firehose/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000..2899913
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file
diff --git a/flink-connector-aws-kinesis-firehose/src/test/resources/archunit.properties b/flink-connector-aws-kinesis-firehose/src/test/resources/archunit.properties
new file mode 100644
index 0000000..15be88c
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was correct to flag a new
+#       violation, please try to avoid creating the violation. If the violation was created due to a
+#       shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git a/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties b/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..c4fa187
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-sql-connector-aws-kinesis-firehose/pom.xml b/flink-sql-connector-aws-kinesis-firehose/pom.xml
new file mode 100644
index 0000000..8a587ae
--- /dev/null
+++ b/flink-sql-connector-aws-kinesis-firehose/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+    <name>Flink : Connectors : AWS : SQL : Amazon Kinesis Data Firehose</name>
+
+    <properties>
+        <japicmp.skip>true</japicmp.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.flink:flink-connector-base</include>
+                                    <include>org.apache.flink:flink-connector-aws-base</include>
+                                    <include>org.apache.flink:flink-connector-aws-kinesis-firehose</include>
+                                    <include>software.amazon.awssdk:*</include>
+                                    <include>io.netty:*</include>
+                                    <include>org.reactivestreams:*</include>
+                                    <include>org.apache.httpcomponents:*</include>
+                                    <include>com.typesafe.netty:*</include>
+                                    <include>commons-logging:commons-logging</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>software.amazon</pattern>
+                                    <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.software.amazon
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.netty</pattern>
+                                    <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.io.netty
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.reactivestreams</pattern>
+                                    <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.org.reactivestreams
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.http</pattern>
+                                    <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.org.apache.http
+                                    </shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.typesafe.netty</pattern>
+                                    <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.com.typesafe.netty
+                                    </shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/flink-sql-connector-aws-kinesis-firehose/src/main/resources/META-INF/NOTICE b/flink-sql-connector-aws-kinesis-firehose/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7cf7302
--- /dev/null
+++ b/flink-sql-connector-aws-kinesis-firehose/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,48 @@
+flink-sql-connector-aws-kinesis-firehose
+
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- software.amazon.awssdk:firehose:2.17.247
+- software.amazon.awssdk:aws-json-protocol:2.17.247
+- software.amazon.awssdk:protocol-core:2.17.247
+- software.amazon.awssdk:profiles:2.17.247
+- software.amazon.awssdk:sdk-core:2.17.247
+- software.amazon.awssdk:auth:2.17.247
+- software.amazon.awssdk:http-client-spi:2.17.247
+- software.amazon.awssdk:regions:2.17.247
+- software.amazon.awssdk:annotations:2.17.247
+- software.amazon.awssdk:utils:2.17.247
+- software.amazon.awssdk:aws-core:2.17.247
+- software.amazon.awssdk:metrics-spi:2.17.247
+- software.amazon.awssdk:apache-client:2.17.247
+- software.amazon.awssdk:netty-nio-client:2.17.247
+- software.amazon.awssdk:sts:2.17.247
+- software.amazon.awssdk:aws-query-protocol:2.17.247
+- software.amazon.awssdk:json-utils:2.17.247
+- software.amazon.awssdk:third-party-jackson-core:2.17.247
+- io.netty:netty-codec-http:4.1.70.Final
+- io.netty:netty-codec-http2:4.1.70.Final
+- io.netty:netty-codec:4.1.70.Final
+- io.netty:netty-transport:4.1.70.Final
+- io.netty:netty-resolver:4.1.70.Final
+- io.netty:netty-common:4.1.70.Final
+- io.netty:netty-buffer:4.1.70.Final
+- io.netty:netty-handler:4.1.70.Final
+- io.netty:netty-transport-native-epoll:linux-x86_64:4.1.70.Final
+- io.netty:netty-transport-native-unix-common:4.1.70.Final
+- io.netty:netty-transport-classes-epoll:4.1.70.Final
+- com.typesafe.netty:netty-reactive-streams-http:2.0.5
+- com.typesafe.netty:netty-reactive-streams:2.0.5
+- org.apache.httpcomponents:httpclient:4.5.13
+- org.apache.httpcomponents:httpcore:4.4.14
+- commons-logging:commons-logging:1.1.3
+
+This project bundles the following dependencies under the Creative Commons Zero license (https://creativecommons.org/publicdomain/zero/1.0/).
+
+- org.reactivestreams:reactive-streams:1.0.3
+
diff --git a/pom.xml b/pom.xml
index 0dc0342..d701652 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,8 @@ under the License.
         <module>flink-connector-aws-base</module>
         <module>flink-connector-dynamodb</module>
         <module>flink-sql-connector-dynamodb</module>
+        <module>flink-connector-aws-kinesis-firehose</module>
+        <module>flink-sql-connector-aws-kinesis-firehose</module>
     </modules>
 
     <dependencies>