You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/02/01 15:35:48 UTC

[flink] branch master updated (8a9a08b -> 9050fd5)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8a9a08b  [FLINK-25041][e2e] Improve error handling for missing environment variables in E2E tests
     new 0e908be  [FLINK-24228][connectors/firehose] Amazon Kinesis Data Firehose sink based on the Async Sink Base implemented:  - Refactored AWSUnifiedSinksUtil into a class that caters for Kinesis and Firehose  - Extracted commonalities between KDS & KDF sinks into flink-connector-aws-base  - Implemented integration test based on Localstack container  - Changing host/container ports to be different, changing HTTP1.1 to being the default, localstack issue fixed  - Added docs page, chang [...]
     new edcd0b0  [FLINK-24228][connectors/firehose] Added documentation (&.zh docs) for the new Firehose sink.
     new 9050fd5  [FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/connectors/datastream/firehose.md         | 161 +++++++++++++++++++
 .../content/docs/connectors/datastream/firehose.md | 161 +++++++++++++++++++
 flink-connectors/flink-connector-aws-base/pom.xml  |  21 +++
 .../connector/aws/util/AWSAsyncSinkUtil.java}      | 107 +++++++------
 .../aws/testutils/AWSServicesTestUtils.java        | 173 +++++++++++++++++++++
 .../aws/testutils/LocalstackContainer.java         |  78 ++++++++++
 .../connector/aws/util/AWSAsyncSinkUtilTest.java}  | 150 +++++++++++-------
 .../pom.xml                                        |  12 --
 .../KinesisDataStreamsConfigConstants.java}        |   9 +-
 .../kinesis/sink/KinesisDataStreamsSinkWriter.java |  10 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../pom.xml                                        |  43 +++--
 .../sink/KinesisFirehoseConfigConstants.java}      |  17 +-
 .../firehose/sink/KinesisFirehoseException.java    |  54 +++++++
 .../firehose/sink/KinesisFirehoseSink.java         | 115 ++++++++++++++
 .../firehose/sink/KinesisFirehoseSinkBuilder.java  | 162 +++++++++++++++++++
 .../sink/KinesisFirehoseSinkElementConverter.java  |  74 +++++++++
 .../firehose/sink/KinesisFirehoseSinkWriter.java}  | 119 +++++++-------
 .../src/main/resources/log4j2.properties           |   0
 .../sink/KinesisFirehoseSinkBuilderTest.java       |  68 ++++++++
 .../KinesisFirehoseSinkElementConverterTest.java   |  54 +++++++
 .../firehose/sink/KinesisFirehoseSinkITCase.java   | 131 ++++++++++++++++
 .../firehose/sink/KinesisFirehoseSinkTest.java     |  76 +++++++++
 .../sink/KinesisFirehoseSinkWriterTest.java        | 107 +++++++++++++
 .../sink/testutils/KinesisFirehoseTestUtils.java   |  73 +++++++++
 .../src/test}/resources/log4j2-test.properties     |   0
 flink-connectors/flink-connector-base/pom.xml      |  16 ++
 .../base/sink/writer/AsyncSinkWriterTest.java      | 120 +-------------
 .../base/sink/writer/TestSinkInitContext.java      | 139 +++++++++++++++++
 .../connectors/kinesis/proxy/KinesisProxyV2.java   |   4 +-
 .../kinesis/proxy/KinesisProxyV2Factory.java       |  18 ++-
 .../streaming/connectors/kinesis/util/AWSUtil.java |  11 +-
 .../connectors/kinesis/util/AwsV2Util.java         |   8 +
 .../connectors/kinesis/util/AwsV2UtilTest.java     |  34 ++++
 flink-connectors/pom.xml                           |   1 +
 .../org/apache/flink/util/DockerImageVersions.java |   2 +
 tools/ci/stage.sh                                  |   1 +
 37 files changed, 1977 insertions(+), 354 deletions(-)
 create mode 100644 docs/content.zh/docs/connectors/datastream/firehose.md
 create mode 100644 docs/content/docs/connectors/datastream/firehose.md
 rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java => flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java} (61%)
 create mode 100644 flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
 create mode 100644 flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
 rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java => flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java} (62%)
 copy flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/{config/AWSKinesisDataStreamsConfigConstants.java => sink/KinesisDataStreamsConfigConstants.java} (80%)
 copy flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-firehose}/pom.xml (83%)
 rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java => flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java} (62%)
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
 copy flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java => flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java} (54%)
 copy flink-connectors/{flink-connector-base => flink-connector-aws-kinesis-firehose}/src/main/resources/log4j2.properties (100%)
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
 create mode 100644 flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
 copy {flink-test-utils-parent/flink-connector-test-utils/src/main => flink-connectors/flink-connector-aws-kinesis-firehose/src/test}/resources/log4j2-test.properties (100%)
 create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java

[flink] 02/03: [FLINK-24228][connectors/firehose] Added documentation (&.zh docs) for the new Firehose sink.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit edcd0b0aec578ad73ca773fac5102cb026611306
Author: Zichen Liu <zi...@amazon.com>
AuthorDate: Mon Jan 24 18:29:37 2022 +0000

    [FLINK-24228][connectors/firehose] Added documentation (&.zh docs) for the new Firehose sink.
---
 .../docs/connectors/datastream/firehose.md         | 171 +++++++++++++++++++++
 .../content/docs/connectors/datastream/firehose.md | 171 +++++++++++++++++++++
 2 files changed, 342 insertions(+)

diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md b/docs/content.zh/docs/connectors/datastream/firehose.md
new file mode 100644
index 0000000..ecc77ce
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/firehose.md
@@ -0,0 +1,171 @@
+---
+title: Firehose
+weight: 5
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose Sink
+
+The Firehose sink writes to [Amazon Kinesis Data Firehose](https://aws.amazon.com/kinesis/data-firehose/).
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer Guide](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)
+to setup a Kinesis Data Firehose delivery stream.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< artifact flink-connector-aws-kinesis-firehose >}}
+
+The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Firehose delivery stream.
+
+{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
+{{< tab "Java" >}}
+```java
+KinesisFirehoseSinkElementConverter<String> elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build();
+
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+
+KinesisFirehoseSink<String> kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build();
+
+flinkStream.sinkTo(kdfSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build()
+
+Properties sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+
+val kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build()
+
+flinkStream.sinkTo(kdfSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+
+1. __setFirehoseClientProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the Firehose client.
+2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+    * Required.
+    * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+3. __setDeliveryStreamName(String deliveryStreamName)__
+    * Required.
+    * Name of the delivery stream to sink to.
+4. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records to Firehose are treated as fatal exceptions in the sink.
+5. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `500`.
+    * Maximum size of a batch to write to Firehose.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before backpressure is applied.
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * Optional. Default: `4 * 1024 * 1024`.
+    * The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * Optional. Default: `1000 * 1024`.
+    * The maximum record size that the sink will accept, records larger than this will be automatically rejected.
+11. _build()_
+    * Constructs and returns the Firehose sink.
+
+
+## Using Custom Firehose Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer against a Firehose VPC endpoint or a non-AWS
+Firehose endpoint such as [Localstack](https://localstack.cloud/); this is especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and `AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+        producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+        producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+        producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+        producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/firehose.md b/docs/content/docs/connectors/datastream/firehose.md
new file mode 100644
index 0000000..b224665
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/firehose.md
@@ -0,0 +1,171 @@
+---
+title: Firehose
+weight: 5
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose Sink
+
+The Firehose sink writes to [Amazon Kinesis Data Firehose](https://aws.amazon.com/kinesis/data-firehose/).
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer Guide](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)
+to setup a Kinesis Data Firehose delivery stream.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< artifact flink-connector-aws-kinesis-firehose >}}
+
+The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Firehose delivery stream.
+
+{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
+{{< tab "Java" >}}
+```java
+KinesisFirehoseSinkElementConverter<String> elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build();
+
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+
+KinesisFirehoseSink<String> kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build();
+
+flinkStream.sinkTo(kdfSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val elementConverter =
+    KinesisFirehoseSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .build()
+
+Properties sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+
+val kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setDeliveryStreamName("your-stream-name")     // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(50)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .build()
+
+flinkStream.sinkTo(kdfSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+
+1. __setFirehoseClientProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the Firehose client.
+2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+    * Required.
+    * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+3. __setDeliveryStreamName(String deliveryStreamName)__
+    * Required.
+    * Name of the delivery stream to sink to.
+4. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records to Firehose are treated as fatal exceptions in the sink.
+5. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `500`.
+    * Maximum size of a batch to write to Firehose.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before backpressure is applied. 
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * Optional. Default: `4 * 1024 * 1024`.
+    * The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * Optional. Default: `1000 * 1024`.
+    * The maximum record size that the sink will accept, records larger than this will be automatically rejected.
+11. _build()_
+    * Constructs and returns the Firehose sink.
+
+
+## Using Custom Firehose Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer against a Firehose VPC endpoint or a non-AWS
+Firehose endpoint such as [Localstack](https://localstack.cloud/); this is especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and `AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}

[flink] 03/03: [FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9050fd56591cbcb3270dd40ea370ea605bda8d39
Author: Zichen Liu <zi...@amazon.com>
AuthorDate: Mon Jan 24 18:55:17 2022 +0000

    [FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase
---
 .../docs/connectors/datastream/firehose.md         | 56 +++++++---------
 .../content/docs/connectors/datastream/firehose.md | 56 +++++++---------
 .../aws/testutils/AWSServicesTestUtils.java        | 44 +++++++++++++
 .../firehose/sink/KinesisFirehoseSinkBuilder.java  | 27 ++++++--
 .../sink/KinesisFirehoseSinkElementConverter.java  |  8 +--
 .../firehose/sink/KinesisFirehoseSinkWriter.java   |  9 +--
 .../sink/KinesisFirehoseSinkBuilderTest.java       | 16 ++---
 .../KinesisFirehoseSinkElementConverterTest.java   |  2 +-
 .../firehose/sink/KinesisFirehoseSinkITCase.java   | 31 +++++----
 .../firehose/sink/examples/SinkIntoFirehose.java   | 74 ----------------------
 10 files changed, 143 insertions(+), 180 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md b/docs/content.zh/docs/connectors/datastream/firehose.md
index ecc77ce..e958a7c 100644
--- a/docs/content.zh/docs/connectors/datastream/firehose.md
+++ b/docs/content.zh/docs/connectors/datastream/firehose.md
@@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com
 {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
 {{< tab "Java" >}}
 ```java
-KinesisFirehoseSinkElementConverter<String> elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build();
-
 Properties sinkProperties = new Properties();
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
@@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 KinesisFirehoseSink<String> kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build();
 
 flinkStream.sinkTo(kdfSink);
@@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink);
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build()
-
 Properties sinkProperties = new Properties()
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
@@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 val kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build()
 
 flinkStream.sinkTo(kdfSink)
@@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink)
 
 ## Configurations
 
-Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
 
 1. __setFirehoseClientProperties(Properties sinkProperties)__
     * Required.
     * Supplies credentials, region and other parameters to the Firehose client.
-2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
     * Required.
-    * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+    * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
 3. __setDeliveryStreamName(String deliveryStreamName)__
     * Required.
     * Name of the delivery stream to sink to.
diff --git a/docs/content/docs/connectors/datastream/firehose.md b/docs/content/docs/connectors/datastream/firehose.md
index b224665..20beb61 100644
--- a/docs/content/docs/connectors/datastream/firehose.md
+++ b/docs/content/docs/connectors/datastream/firehose.md
@@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com
 {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
 {{< tab "Java" >}}
 ```java
-KinesisFirehoseSinkElementConverter<String> elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build();
-
 Properties sinkProperties = new Properties();
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
@@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 KinesisFirehoseSink<String> kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build();
 
 flinkStream.sinkTo(kdfSink);
@@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink);
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build()
-
 Properties sinkProperties = new Properties()
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
@@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 val kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build()
 
 flinkStream.sinkTo(kdfSink)
@@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink)
 
 ## Configurations
 
-Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
 
 1. __setFirehoseClientProperties(Properties sinkProperties)__
     * Required.
     * Supplies credentials, region and other parameters to the Firehose client.
-2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
     * Required.
-    * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+    * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
 3. __setDeliveryStreamName(String deliveryStreamName)__
     * Required.
     * Name of the delivery stream to sink to.
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 543d81b..ff915fa 100644
--- a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
 import software.amazon.awssdk.core.waiters.WaiterResponse;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
@@ -31,6 +33,8 @@ import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
 import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
 import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
@@ -44,6 +48,8 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
@@ -126,4 +132,42 @@ public class AWSServicesTestUtils {
         CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects);
         return res.get().contents();
     }
+
+    public static <T> List<T> readObjectsFromS3Bucket(
+            S3AsyncClient s3AsyncClient,
+            List<S3Object> objects,
+            String bucketName,
+            Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+        S3BucketReader bucketReader = new S3BucketReader(s3AsyncClient, bucketName);
+        return bucketReader.readObjects(objects, deserializer);
+    }
+
+    /** Helper class to read objects from S3. */
+    private static class S3BucketReader {
+        private final S3AsyncClient s3AsyncClient;
+        private final String bucketName;
+
+        public S3BucketReader(S3AsyncClient s3AsyncClient, String bucketName) {
+            this.s3AsyncClient = s3AsyncClient;
+            this.bucketName = bucketName;
+        }
+
+        public <T> List<T> readObjects(
+                List<S3Object> objectList,
+                Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+            return objectList.stream()
+                    .map(object -> readObjectWitKey(object.key(), deserializer))
+                    .collect(Collectors.toList());
+        }
+
+        public <T> T readObjectWitKey(
+                String key, Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+            GetObjectRequest getObjectRequest =
+                    GetObjectRequest.builder().bucket(bucketName).key(key).build();
+            return s3AsyncClient
+                    .getObject(getObjectRequest, AsyncResponseTransformer.toBytes())
+                    .thenApply(deserializer)
+                    .join();
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
index ee22e1f..a180abc 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
 
 import software.amazon.awssdk.http.Protocol;
@@ -36,11 +37,6 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1;
  * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name.
  *
  * <pre>{@code
- * private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
- *         KinesisFirehoseSinkElementConverter.<String>builder()
- *                 .setSerializationSchema(new SimpleStringSchema())
- *                 .build();
- *
  * Properties sinkProperties = new Properties();
  * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
  *
@@ -50,6 +46,7 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1;
  *                 .setDeliveryStreamName("delivery-stream-name")
  *                 .setMaxBatchSize(20)
  *                 .setFirehoseClientProperties(sinkProperties)
+ *                 .setSerializationSchema(new SimpleStringSchema())
  *                 .build();
  * }</pre>
  *
@@ -73,7 +70,7 @@ public class KinesisFirehoseSinkBuilder<InputT>
 
     private static final int DEFAULT_MAX_BATCH_SIZE = 500;
     private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
-    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
     private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024;
     private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
     private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024;
@@ -83,6 +80,7 @@ public class KinesisFirehoseSinkBuilder<InputT>
     private Boolean failOnError;
     private String deliveryStreamName;
     private Properties firehoseClientProperties;
+    private SerializationSchema<InputT> serializationSchema;
 
     KinesisFirehoseSinkBuilder() {}
 
@@ -100,6 +98,19 @@ public class KinesisFirehoseSinkBuilder<InputT>
     }
 
     /**
+     * Allows the user to specify a serialization schema to serialize each record to persist to
+     * Firehose.
+     *
+     * @param serializationSchema serialization schema to use
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setSerializationSchema(
+            SerializationSchema<InputT> serializationSchema) {
+        this.serializationSchema = serializationSchema;
+        return this;
+    }
+
+    /**
      * If writing to Kinesis Data Firehose results in a partial or full failure being returned, the
      * job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set.
      *
@@ -134,7 +145,9 @@ public class KinesisFirehoseSinkBuilder<InputT>
     @Override
     public KinesisFirehoseSink<InputT> build() {
         return new KinesisFirehoseSink<>(
-                getElementConverter(),
+                KinesisFirehoseSinkElementConverter.<InputT>builder()
+                        .setSerializationSchema(serializationSchema)
+                        .build(),
                 Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
                 Optional.ofNullable(getMaxInFlightRequests())
                         .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index 45b4186..cca749c 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.firehose.sink;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -31,7 +31,7 @@ import software.amazon.awssdk.services.firehose.model.Record;
  * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a
  * {@link Record} that may be persisted.
  */
-@PublicEvolving
+@Internal
 public class KinesisFirehoseSinkElementConverter<InputT>
         implements ElementConverter<InputT, Record> {
 
@@ -54,7 +54,6 @@ public class KinesisFirehoseSinkElementConverter<InputT>
     }
 
     /** A builder for the KinesisFirehoseSinkElementConverter. */
-    @PublicEvolving
     public static class Builder<InputT> {
 
         private SerializationSchema<InputT> serializationSchema;
@@ -68,8 +67,7 @@ public class KinesisFirehoseSinkElementConverter<InputT>
         public KinesisFirehoseSinkElementConverter<InputT> build() {
             Preconditions.checkNotNull(
                     serializationSchema,
-                    "No SerializationSchema was supplied to the "
-                            + "KinesisFirehoseSinkElementConverter builder.");
+                    "No SerializationSchema was supplied to the " + "KinesisFirehoseSink builder.");
             return new KinesisFirehoseSinkElementConverter<>(serializationSchema);
         }
     }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 4002ec3..b41103f 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -37,7 +37,6 @@ import software.amazon.awssdk.services.firehose.model.Record;
 import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -116,7 +115,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
 
     @Override
     protected void submitRequestEntries(
-            List<Record> requestEntries, Consumer<Collection<Record>> requestResult) {
+            List<Record> requestEntries, Consumer<List<Record>> requestResult) {
 
         PutRecordBatchRequest batchRequest =
                 PutRecordBatchRequest.builder()
@@ -146,9 +145,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
     }
 
     private void handleFullyFailedRequest(
-            Throwable err,
-            List<Record> requestEntries,
-            Consumer<Collection<Record>> requestResult) {
+            Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
         LOG.warn(
                 "KDF Sink failed to persist {} entries to KDF first request was {}",
                 requestEntries.size(),
@@ -164,7 +161,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
     private void handlePartiallyFailedRequest(
             PutRecordBatchResponse response,
             List<Record> requestEntries,
-            Consumer<Collection<Record>> requestResult) {
+            Consumer<List<Record>> requestResult) {
         LOG.warn(
                 "KDF Sink failed to persist {} entries to KDF first request was {}",
                 requestEntries.size(),
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
index 0dc85da..88f4329 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -17,19 +17,17 @@
 
 package org.apache.flink.connector.firehose.sink;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
-import software.amazon.awssdk.services.firehose.model.Record;
 
 /** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
 public class KinesisFirehoseSinkBuilderTest {
-    private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER =
-            KinesisFirehoseSinkElementConverter.<String>builder()
-                    .setSerializationSchema(new SimpleStringSchema())
-                    .build();
+
+    private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
+            new SimpleStringSchema();
 
     @Test
     public void elementConverterOfSinkMustBeSetWhenBuilt() {
@@ -40,7 +38,7 @@ public class KinesisFirehoseSinkBuilderTest {
                                         .setDeliveryStreamName("deliveryStream")
                                         .build())
                 .withMessageContaining(
-                        "ElementConverter must be not null when initializing the AsyncSinkBase.");
+                        "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
     }
 
     @Test
@@ -49,7 +47,7 @@ public class KinesisFirehoseSinkBuilderTest {
                 .isThrownBy(
                         () ->
                                 KinesisFirehoseSink.<String>builder()
-                                        .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .setSerializationSchema(SERIALIZATION_SCHEMA)
                                         .build())
                 .withMessageContaining(
                         "The delivery stream name must not be null when initializing the KDF Sink.");
@@ -62,7 +60,7 @@ public class KinesisFirehoseSinkBuilderTest {
                         () ->
                                 KinesisFirehoseSink.<String>builder()
                                         .setDeliveryStreamName("")
-                                        .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .setSerializationSchema(SERIALIZATION_SCHEMA)
                                         .build())
                 .withMessageContaining(
                         "The delivery stream name must be set when initializing the KDF Sink.");
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
index ed0b1c7..221f444 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -35,7 +35,7 @@ public class KinesisFirehoseSinkElementConverterTest {
         Assertions.assertThatExceptionOfType(NullPointerException.class)
                 .isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build())
                 .withMessageContaining(
-                        "No SerializationSchema was supplied to the KinesisFirehoseSinkElementConverter builder.");
+                        "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
     }
 
     @Test
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
index 08cb49b..8809437 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -19,7 +19,6 @@ package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.connector.aws.testutils.LocalstackContainer;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.DockerImageVersions;
@@ -35,12 +34,12 @@ import org.slf4j.LoggerFactory;
 import org.testcontainers.utility.DockerImageName;
 import software.amazon.awssdk.core.SdkSystemSetting;
 import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
-import software.amazon.awssdk.services.firehose.model.Record;
 import software.amazon.awssdk.services.iam.IamAsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.utils.ImmutableMap;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
@@ -49,6 +48,7 @@ import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getC
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getIamClient;
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getS3Client;
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket;
 import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
 import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -56,22 +56,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
 public class KinesisFirehoseSinkITCase {
 
-    private static final ElementConverter<String, Record> elementConverter =
-            KinesisFirehoseSinkElementConverter.<String>builder()
-                    .setSerializationSchema(new SimpleStringSchema())
-                    .build();
-
     private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
-    private S3AsyncClient s3AsyncClient;
-    private FirehoseAsyncClient firehoseAsyncClient;
-    private IamAsyncClient iamAsyncClient;
-
     private static final String ROLE_NAME = "super-role";
     private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME;
     private static final String BUCKET_NAME = "s3-firehose";
     private static final String STREAM_NAME = "s3-stream";
     private static final int NUMBER_OF_ELEMENTS = 92;
 
+    private S3AsyncClient s3AsyncClient;
+    private FirehoseAsyncClient firehoseAsyncClient;
+    private IamAsyncClient iamAsyncClient;
+
     @ClassRule
     public static LocalstackContainer mockFirehoseContainer =
             new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
@@ -106,10 +101,15 @@ public class KinesisFirehoseSinkITCase {
                         .map(Object::toString)
                         .returns(String.class)
                         .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+        List<String> expectedElements = new ArrayList<>();
+        for (int i = 1; i < NUMBER_OF_ELEMENTS; i++) {
+            expectedElements.add(
+                    mapper.writeValueAsString(ImmutableMap.of("data", String.valueOf(i))));
+        }
 
         KinesisFirehoseSink<String> kdsSink =
                 KinesisFirehoseSink.<String>builder()
-                        .setElementConverter(elementConverter)
+                        .setSerializationSchema(new SimpleStringSchema())
                         .setDeliveryStreamName(STREAM_NAME)
                         .setMaxBatchSize(1)
                         .setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint()))
@@ -120,5 +120,12 @@ public class KinesisFirehoseSinkITCase {
 
         List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME);
         assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
+        assertThat(
+                        readObjectsFromS3Bucket(
+                                s3AsyncClient,
+                                objects,
+                                BUCKET_NAME,
+                                response -> new String(response.asByteArrayUnsafe())))
+                .containsAll(expectedElements);
     }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
deleted file mode 100644
index 3e9d0ee..0000000
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.firehose.sink.examples;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
-import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-import java.util.Properties;
-
-/**
- * An example application demonstrating how to use the {@link KinesisFirehoseSink} to sink into KDF.
- *
- * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
- * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY}
- * through environment variables etc.
- */
-public class SinkIntoFirehose {
-
-    private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
-            KinesisFirehoseSinkElementConverter.<String>builder()
-                    .setSerializationSchema(new SimpleStringSchema())
-                    .build();
-
-    public static void main(String[] args) throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(10_000);
-
-        DataStream<String> generator =
-                env.fromSequence(1, 10_000_000L)
-                        .map(Object::toString)
-                        .returns(String.class)
-                        .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
-
-        Properties sinkProperties = new Properties();
-        sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
-
-        KinesisFirehoseSink<String> kdfSink =
-                KinesisFirehoseSink.<String>builder()
-                        .setElementConverter(elementConverter)
-                        .setDeliveryStreamName("delivery-stream")
-                        .setMaxBatchSize(20)
-                        .setFirehoseClientProperties(sinkProperties)
-                        .build();
-
-        generator.sinkTo(kdfSink);
-
-        env.execute("KDF Async Sink Example Program");
-    }
-}

[flink] 01/03: [FLINK-24228][connectors/firehose] Amazon Kinesis Data Firehose sink based on the Async Sink Base implemented: - Refactored AWSUnifiedSinksUtil into a class that caters for Kinesis and Firehose - Extracted commonalities between KDS & KDF sinks into flink-connector-aws-base - Implemented integration test based on Localstack container - Changing host/container ports to be different, changing HTTP1.1 to being the default, localstack issue fixed - Added docs page, changed type in Firehose [...]

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e908be6e5344f6220a1329580d9e4535cd6ceb6
Author: Zichen Liu <zi...@amazon.com>
AuthorDate: Thu Dec 23 11:57:09 2021 +0000

    [FLINK-24228][connectors/firehose] Amazon Kinesis Data Firehose sink based on the Async Sink Base implemented:
     - Refactored AWSUnifiedSinksUtil into a class that caters for Kinesis and Firehose
     - Extracted commonalities between KDS & KDF sinks into flink-connector-aws-base
     - Implemented integration test based on Localstack container
     - Changing host/container ports to be different, changing HTTP1.1 to being the default, localstack issue fixed
     - Added docs page, changed type in Firehose, turned logging off, removed unused dependencies.
---
 flink-connectors/flink-connector-aws-base/pom.xml  |  21 +++
 .../connector/aws/util/AWSAsyncSinkUtil.java}      | 107 ++++++++-------
 .../aws/testutils/AWSServicesTestUtils.java        | 129 ++++++++++++++++++
 .../aws/testutils/LocalstackContainer.java         |  78 +++++++++++
 .../connector/aws/util/AWSAsyncSinkUtilTest.java}  | 150 ++++++++++++---------
 .../pom.xml                                        |  12 --
 .../KinesisDataStreamsConfigConstants.java}        |   9 +-
 .../kinesis/sink/KinesisDataStreamsSinkWriter.java |  10 +-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../pom.xml                                        |  43 +++---
 .../sink/KinesisFirehoseConfigConstants.java}      |  17 ++-
 .../firehose/sink/KinesisFirehoseException.java    |  54 ++++++++
 .../firehose/sink/KinesisFirehoseSink.java         | 115 ++++++++++++++++
 .../firehose/sink/KinesisFirehoseSinkBuilder.java  | 149 ++++++++++++++++++++
 .../sink/KinesisFirehoseSinkElementConverter.java  |  76 +++++++++++
 .../firehose/sink/KinesisFirehoseSinkWriter.java}  | 120 +++++++++--------
 .../src/main/resources/log4j2.properties           |  25 ++++
 .../sink/KinesisFirehoseSinkBuilderTest.java       |  70 ++++++++++
 .../KinesisFirehoseSinkElementConverterTest.java   |  54 ++++++++
 .../firehose/sink/KinesisFirehoseSinkITCase.java   | 124 +++++++++++++++++
 .../firehose/sink/KinesisFirehoseSinkTest.java     |  76 +++++++++++
 .../sink/KinesisFirehoseSinkWriterTest.java        | 107 +++++++++++++++
 .../firehose/sink/examples/SinkIntoFirehose.java   |  74 ++++++++++
 .../sink/testutils/KinesisFirehoseTestUtils.java   |  73 ++++++++++
 .../src/test/resources/log4j2-test.properties      |   2 +-
 flink-connectors/flink-connector-base/pom.xml      |  16 +++
 .../base/sink/writer/AsyncSinkWriterTest.java      | 120 +----------------
 .../base/sink/writer/TestSinkInitContext.java      | 139 +++++++++++++++++++
 .../connectors/kinesis/proxy/KinesisProxyV2.java   |   4 +-
 .../kinesis/proxy/KinesisProxyV2Factory.java       |  18 ++-
 .../streaming/connectors/kinesis/util/AWSUtil.java |  11 +-
 .../connectors/kinesis/util/AwsV2Util.java         |   8 ++
 .../connectors/kinesis/util/AwsV2UtilTest.java     |  34 +++++
 flink-connectors/pom.xml                           |   1 +
 .../org/apache/flink/util/DockerImageVersions.java |   2 +
 tools/ci/stage.sh                                  |   1 +
 36 files changed, 1697 insertions(+), 354 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-base/pom.xml b/flink-connectors/flink-connector-aws-base/pom.xml
index b774b2e..870e487 100644
--- a/flink-connectors/flink-connector-aws-base/pom.xml
+++ b/flink-connectors/flink-connector-aws-base/pom.xml
@@ -63,6 +63,27 @@ under the License.
 			<artifactId>sts</artifactId>
 			<version>${aws.sdk.version}</version>
 		</dependency>
+
+		<!-- Test dependencies -->
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>testcontainers</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>s3</artifactId>
+			<version>${aws.sdk.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>iam</artifactId>
+			<version>${aws.sdk.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
similarity index 61%
rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java
rename to flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
index 1ec75aa..1256142 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtil.java
+++ b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
@@ -15,24 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.util;
+package org.apache.flink.connector.aws.util;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.aws.util.AWSGeneralUtil;
-import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
 import software.amazon.awssdk.core.client.config.SdkClientOption;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
-import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
 
 import java.net.URI;
 import java.util.Optional;
@@ -40,11 +37,10 @@ import java.util.Properties;
 
 /** Some utilities specific to Amazon Web Service. */
 @Internal
-public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil {
+public class AWSAsyncSinkUtil extends AWSGeneralUtil {
 
-    /** Used for formatting Flink-specific user agent string when creating Kinesis client. */
-    private static final String USER_AGENT_FORMAT =
-            AWSKinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT + " V2";
+    /** V2 suffix to denote the unified sinks. V1 sinks are based on KPL etc. */
+    static final String V2_USER_AGENT_SUFFIX = " V2";
 
     /**
      * Creates a user agent prefix for Flink. This can be used by HTTP Clients.
@@ -62,51 +58,61 @@ public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil {
 
     /**
      * @param configProps configuration properties
-     * @param httpClient the underlying HTTP client used to talk to Kinesis
-     * @return a new Amazon Kinesis Client
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Client
      */
-    public static KinesisAsyncClient createKinesisAsyncClient(
-            final Properties configProps, final SdkAsyncHttpClient httpClient) {
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final SdkAsyncHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
         SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build();
-        return createKinesisAsyncClient(configProps, clientConfiguration, httpClient);
+        return createAwsAsyncClient(
+                configProps,
+                clientConfiguration,
+                httpClient,
+                clientBuilder,
+                awsUserAgentPrefixFormat,
+                awsClientUserAgentPrefix);
     }
 
     /**
      * @param configProps configuration properties
      * @param clientConfiguration the AWS SDK v2 config to instantiate the client
-     * @param httpClient the underlying HTTP client used to talk to Kinesis
-     * @return a new Amazon Kinesis Client
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Client
      */
-    public static KinesisAsyncClient createKinesisAsyncClient(
-            final Properties configProps,
-            final SdkClientConfiguration clientConfiguration,
-            final SdkAsyncHttpClient httpClient) {
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final SdkClientConfiguration clientConfiguration,
+                    final SdkAsyncHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
         String flinkUserAgentPrefix =
-                Optional.ofNullable(
-                                configProps.getProperty(
-                                        AWSKinesisDataStreamsConfigConstants
-                                                .KINESIS_CLIENT_USER_AGENT_PREFIX))
-                        .orElse(formatFlinkUserAgentPrefix(USER_AGENT_FORMAT));
+                Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
+                        .orElse(
+                                formatFlinkUserAgentPrefix(
+                                        awsUserAgentPrefixFormat + V2_USER_AGENT_SUFFIX));
 
         final ClientOverrideConfiguration overrideConfiguration =
                 createClientOverrideConfiguration(
                         clientConfiguration,
                         ClientOverrideConfiguration.builder(),
                         flinkUserAgentPrefix);
-        final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();
 
-        return createKinesisAsyncClient(
-                configProps, clientBuilder, httpClient, overrideConfiguration);
-    }
-
-    @VisibleForTesting
-    static ClientOverrideConfiguration createClientOverrideConfiguration(
-            final SdkClientConfiguration config,
-            final ClientOverrideConfiguration.Builder overrideConfigurationBuilder) {
-        return createClientOverrideConfiguration(
-                config,
-                overrideConfigurationBuilder,
-                formatFlinkUserAgentPrefix(USER_AGENT_FORMAT));
+        return createAwsAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
     }
 
     @VisibleForTesting
@@ -131,11 +137,16 @@ public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil {
     }
 
     @VisibleForTesting
-    static KinesisAsyncClient createKinesisAsyncClient(
-            final Properties configProps,
-            final KinesisAsyncClientBuilder clientBuilder,
-            final SdkAsyncHttpClient httpClient,
-            final ClientOverrideConfiguration overrideConfiguration) {
+    static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final T clientBuilder,
+                    final SdkAsyncHttpClient httpClient,
+                    final ClientOverrideConfiguration overrideConfiguration) {
 
         if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
             final URI endpointOverride =
@@ -150,10 +161,4 @@ public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil {
                 .region(getRegion(configProps))
                 .build();
     }
-
-    public static boolean isRecoverableException(Exception e) {
-        Throwable cause = e.getCause();
-        return cause instanceof LimitExceededException
-                || cause instanceof ProvisionedThroughputExceededException;
-    }
 }
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
new file mode 100644
index 0000000..543d81b
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.waiters.WaiterResponse;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.iam.IamAsyncClient;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.waiters.S3AsyncWaiter;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the Localstack container.
+ */
+public class AWSServicesTestUtils {
+
+    private static final String ACCESS_KEY_ID = "accessKeyId";
+    private static final String SECRET_ACCESS_KEY = "secretAccessKey";
+
+    public static S3AsyncClient getS3Client(String endpoint) throws URISyntaxException {
+        return S3AsyncClient.builder()
+                .httpClient(getHttpClient(endpoint))
+                .region(Region.AP_SOUTHEAST_1)
+                .endpointOverride(new URI(endpoint))
+                .credentialsProvider(getDefaultCredentials())
+                .build();
+    }
+
+    public static IamAsyncClient getIamClient(String endpoint) throws URISyntaxException {
+        return IamAsyncClient.builder()
+                .httpClient(getHttpClient(endpoint))
+                .region(Region.AWS_GLOBAL)
+                .endpointOverride(new URI(endpoint))
+                .credentialsProvider(getDefaultCredentials())
+                .build();
+    }
+
+    public static AwsCredentialsProvider getDefaultCredentials() {
+        return StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(ACCESS_KEY_ID, SECRET_ACCESS_KEY));
+    }
+
+    public static Properties getConfig(String endpoint) {
+        Properties config = new Properties();
+        config.setProperty(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
+        config.setProperty(AWS_ENDPOINT, endpoint);
+        config.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), ACCESS_KEY_ID);
+        config.setProperty(
+                AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), SECRET_ACCESS_KEY);
+        config.setProperty(TRUST_ALL_CERTIFICATES, "true");
+        return config;
+    }
+
+    public static SdkAsyncHttpClient getHttpClient(String endpoint) {
+        return AWSGeneralUtil.createAsyncHttpClient(getConfig(endpoint));
+    }
+
+    public static void createBucket(S3AsyncClient s3Client, String bucketName)
+            throws ExecutionException, InterruptedException {
+        CreateBucketRequest bucketRequest =
+                CreateBucketRequest.builder().bucket(bucketName).build();
+        s3Client.createBucket(bucketRequest);
+
+        HeadBucketRequest bucketRequestWait =
+                HeadBucketRequest.builder().bucket(bucketName).build();
+
+        S3AsyncWaiter s3Waiter = s3Client.waiter();
+        CompletableFuture<WaiterResponse<HeadBucketResponse>> waiterResponseFuture =
+                s3Waiter.waitUntilBucketExists(bucketRequestWait);
+
+        waiterResponseFuture.get();
+    }
+
+    public static void createIAMRole(IamAsyncClient iam, String roleName)
+            throws ExecutionException, InterruptedException {
+        CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();
+
+        CompletableFuture<CreateRoleResponse> responseFuture = iam.createRole(request);
+        responseFuture.get();
+    }
+
+    public static List<S3Object> listBucketObjects(S3AsyncClient s3, String bucketName)
+            throws ExecutionException, InterruptedException {
+        ListObjectsRequest listObjects = ListObjectsRequest.builder().bucket(bucketName).build();
+        CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects);
+        return res.get().contents();
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
new file mode 100644
index 0000000..bdc0517
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A class wrapping the Localstack container that provides mock implementations of many common AWS
+ * services.
+ */
+public class LocalstackContainer extends GenericContainer<LocalstackContainer> {
+
+    private static final int CONTAINER_PORT = 4566;
+
+    public LocalstackContainer(DockerImageName imageName) {
+        super(imageName);
+        withExposedPorts(CONTAINER_PORT);
+        waitingFor(new ListBucketObjectsWaitStrategy());
+    }
+
+    public String getEndpoint() {
+        return String.format("https://%s:%s", getHost(), getMappedPort(CONTAINER_PORT));
+    }
+
+    private class ListBucketObjectsWaitStrategy extends AbstractWaitStrategy {
+        private static final int TRANSACTIONS_PER_SECOND = 1;
+
+        private final RateLimiter rateLimiter =
+                RateLimiterBuilder.newBuilder()
+                        .withRate(TRANSACTIONS_PER_SECOND, SECONDS)
+                        .withConstantThroughput()
+                        .build();
+
+        @Override
+        protected void waitUntilReady() {
+            Unreliables.retryUntilSuccess(
+                    (int) startupTimeout.getSeconds(),
+                    SECONDS,
+                    () -> rateLimiter.getWhenReady(this::list));
+        }
+
+        private List<S3Object> list()
+                throws ExecutionException, InterruptedException, URISyntaxException {
+            String bucketName = "bucket-name-not-to-be-used";
+            S3AsyncClient client = AWSServicesTestUtils.getS3Client(getEndpoint());
+            AWSServicesTestUtils.createBucket(client, bucketName);
+            return AWSServicesTestUtils.listBucketObjects(client, bucketName);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
similarity index 62%
rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java
rename to flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
index 0d6eae8..ccecdb8 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/util/AWSKinesisDataStreamsUtilTest.java
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.util;
-
-import org.apache.flink.connector.aws.util.TestUtil;
-import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
+package org.apache.flink.connector.aws.util;
 
 import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
@@ -29,18 +31,14 @@ import software.amazon.awssdk.core.client.config.SdkClientOption;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
 
 import java.net.URI;
 import java.time.Duration;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.connector.aws.util.AWSAsyncSinkUtil.formatFlinkUserAgentPrefix;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
@@ -48,20 +46,23 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-/** Tests for {@link AWSKinesisDataStreamsUtil}. */
-public class AWSKinesisDataStreamsUtilTest {
+/** Tests for {@link AWSAsyncSinkUtil}. */
+public class AWSAsyncSinkUtilTest {
+
     private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT =
-            AWSKinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT + " V2";
+            "Apache Flink %s (%s) *Destination* Connector";
+    private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 =
+            "Apache Flink %s (%s) *Destination* Connector V2";
 
     @Test
     public void testCreateKinesisAsyncClient() {
         Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
-        KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+        MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
         ClientOverrideConfiguration clientOverrideConfiguration =
                 ClientOverrideConfiguration.builder().build();
         SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
 
-        AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
+        AWSAsyncSinkUtil.createAwsAsyncClient(
                 properties, builder, httpClient, clientOverrideConfiguration);
 
         verify(builder).overrideConfiguration(clientOverrideConfiguration);
@@ -77,12 +78,12 @@ public class AWSKinesisDataStreamsUtilTest {
         Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
         properties.setProperty(AWS_ENDPOINT, "https://localhost");
 
-        KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+        MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
         ClientOverrideConfiguration clientOverrideConfiguration =
                 ClientOverrideConfiguration.builder().build();
         SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
 
-        AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
+        AWSAsyncSinkUtil.createAwsAsyncClient(
                 properties, builder, httpClient, clientOverrideConfiguration);
 
         verify(builder).endpointOverride(URI.create("https://localhost"));
@@ -94,14 +95,17 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
 
-        AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         verify(builder).build();
         verify(builder)
                 .putAdvancedOption(
                         SdkAdvancedClientOption.USER_AGENT_PREFIX,
-                        AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix(
-                                DEFAULT_USER_AGENT_PREFIX_FORMAT));
+                        formatFlinkUserAgentPrefix(DEFAULT_USER_AGENT_PREFIX_FORMAT_V2));
         verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, null);
         verify(builder, never()).apiCallAttemptTimeout(any());
         verify(builder, never()).apiCallTimeout(any());
@@ -116,7 +120,11 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
 
-        AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix");
     }
@@ -130,7 +138,12 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
 
-        AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+                                + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500));
     }
@@ -144,49 +157,18 @@ public class AWSKinesisDataStreamsUtilTest {
 
         ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
 
-        AWSKinesisDataStreamsUtil.createClientOverrideConfiguration(clientConfiguration, builder);
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+                                + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
 
         verify(builder).apiCallTimeout(Duration.ofMillis(600));
     }
 
-    @Test
-    public void testIsRecoverableExceptionForRecoverable() {
-        Exception recoverable = LimitExceededException.builder().build();
-        assertTrue(
-                AWSKinesisDataStreamsUtil.isRecoverableException(
-                        new ExecutionException(recoverable)));
-    }
-
-    @Test
-    public void testIsRecoverableExceptionForNonRecoverable() {
-        Exception nonRecoverable = new IllegalArgumentException("abc");
-        assertFalse(
-                AWSKinesisDataStreamsUtil.isRecoverableException(
-                        new ExecutionException(nonRecoverable)));
-    }
-
-    @Test
-    public void testIsRecoverableExceptionForRuntimeExceptionWrappingRecoverable() {
-        Exception recoverable = LimitExceededException.builder().build();
-        Exception runtime = new RuntimeException("abc", recoverable);
-        assertTrue(AWSKinesisDataStreamsUtil.isRecoverableException(runtime));
-    }
-
-    @Test
-    public void testIsRecoverableExceptionForRuntimeExceptionWrappingNonRecoverable() {
-        Exception nonRecoverable = new IllegalArgumentException("abc");
-        Exception runtime = new RuntimeException("abc", nonRecoverable);
-        assertFalse(AWSKinesisDataStreamsUtil.isRecoverableException(runtime));
-    }
-
-    @Test
-    public void testIsRecoverableExceptionForNullCause() {
-        Exception nonRecoverable = new IllegalArgumentException("abc");
-        assertFalse(AWSKinesisDataStreamsUtil.isRecoverableException(nonRecoverable));
-    }
-
-    private KinesisAsyncClientBuilder mockKinesisAsyncClientBuilder() {
-        KinesisAsyncClientBuilder builder = mock(KinesisAsyncClientBuilder.class);
+    private MockAsyncClientBuilder mockKinesisAsyncClientBuilder() {
+        MockAsyncClientBuilder builder = mock(MockAsyncClientBuilder.class);
         when(builder.overrideConfiguration(any(ClientOverrideConfiguration.class)))
                 .thenReturn(builder);
         when(builder.httpClient(any())).thenReturn(builder);
@@ -205,4 +187,52 @@ public class AWSKinesisDataStreamsUtilTest {
 
         return builder;
     }
+
+    private static class MockAsyncClientBuilder
+            implements AwsAsyncClientBuilder<MockAsyncClientBuilder, SdkClient>,
+                    AwsClientBuilder<MockAsyncClientBuilder, SdkClient> {
+
+        @Override
+        public MockAsyncClientBuilder asyncConfiguration(
+                ClientAsyncConfiguration clientAsyncConfiguration) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder httpClient(SdkAsyncHttpClient sdkAsyncHttpClient) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder httpClientBuilder(SdkAsyncHttpClient.Builder builder) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder credentialsProvider(
+                AwsCredentialsProvider awsCredentialsProvider) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder region(Region region) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder overrideConfiguration(
+                ClientOverrideConfiguration clientOverrideConfiguration) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder endpointOverride(URI uri) {
+            return null;
+        }
+
+        @Override
+        public SdkClient build() {
+            return null;
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml b/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
index 235fc3c..be50a2e 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
@@ -70,18 +70,6 @@ under the License.
 			<version>${aws.sdk.version}</version>
 		</dependency>
 
-		<dependency>
-			<groupId>software.amazon.awssdk</groupId>
-			<artifactId>netty-nio-client</artifactId>
-			<version>${aws.sdk.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>software.amazon.awssdk</groupId>
-			<artifactId>sts</artifactId>
-			<version>${aws.sdk.version}</version>
-		</dependency>
-
 		<!--Table API dependencies-->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
similarity index 80%
copy from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
copy to flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
index b318292..a5a6020 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.config;
+package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 
-/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
+/** Defaults for {@link KinesisDataStreamsSinkWriter}. */
 @PublicEvolving
-public class AWSKinesisDataStreamsConfigConstants {
+public class KinesisDataStreamsConfigConstants {
 
     public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT =
             "Apache Flink %s (%s) Kinesis Connector";
 
-    /** Identifier for user agent prefix. */
+    /** Kinesis identifier for user agent prefix. */
     public static final String KINESIS_CLIENT_USER_AGENT_PREFIX =
             "aws.kinesis.client.user-agent-prefix";
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
index b720e5e..b291c12 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
@@ -18,10 +18,10 @@
 package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
@@ -104,8 +104,12 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
         final SdkAsyncHttpClient httpClient =
                 AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
 
-        return AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
-                kinesisClientProperties, httpClient);
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                kinesisClientProperties,
+                httpClient,
+                KinesisAsyncClient.builder(),
+                KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+                KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
index c5339e7..c4fa187 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = INFO
+rootLogger.level = OFF
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
similarity index 83%
copy from flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
copy to flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
index 235fc3c..d391616 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml
@@ -30,8 +30,8 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
-	<name>Flink : Connectors : AWS Kinesis Data Streams</name>
+	<artifactId>flink-connector-aws-kinesis-firehose</artifactId>
+	<name>Flink : Connectors : AWS Kinesis Data Firehose</name>
 	<properties>
 		<aws.sdk.version>2.17.52</aws.sdk.version>
 	</properties>
@@ -66,7 +66,7 @@ under the License.
 
 		<dependency>
 			<groupId>software.amazon.awssdk</groupId>
-			<artifactId>kinesis</artifactId>
+			<artifactId>firehose</artifactId>
 			<version>${aws.sdk.version}</version>
 		</dependency>
 
@@ -76,19 +76,6 @@ under the License.
 			<version>${aws.sdk.version}</version>
 		</dependency>
 
-		<dependency>
-			<groupId>software.amazon.awssdk</groupId>
-			<artifactId>sts</artifactId>
-			<version>${aws.sdk.version}</version>
-		</dependency>
-
-		<!--Table API dependencies-->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-common</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
 		<!-- Test dependencies -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -105,28 +92,34 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<!-- Kinesis table factory testing -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-common</artifactId>
+			<artifactId>flink-connector-base</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>testcontainers</artifactId>
 			<scope>test</scope>
 		</dependency>
 
 		<dependency>
-			<groupId>org.testcontainers</groupId>
-			<artifactId>testcontainers</artifactId>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>s3</artifactId>
+			<version>${aws.sdk.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>iam</artifactId>
+			<version>${aws.sdk.version}</version>
 			<scope>test</scope>
 		</dependency>
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
similarity index 62%
rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
rename to flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
index b318292..527f74a 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/config/AWSKinesisDataStreamsConfigConstants.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.config;
+package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 
-/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
+/** Defaults for {@link KinesisFirehoseSinkWriter}. */
 @PublicEvolving
-public class AWSKinesisDataStreamsConfigConstants {
+public class KinesisFirehoseConfigConstants {
 
-    public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT =
-            "Apache Flink %s (%s) Kinesis Connector";
+    public static final String BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT =
+            "Apache Flink %s (%s) Firehose Connector";
 
-    /** Identifier for user agent prefix. */
-    public static final String KINESIS_CLIENT_USER_AGENT_PREFIX =
-            "aws.kinesis.client.user-agent-prefix";
+    /** Firehose identifier for user agent prefix. */
+    public static final String FIREHOSE_CLIENT_USER_AGENT_PREFIX =
+            "aws.firehose.client.user-agent-prefix";
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
new file mode 100644
index 0000000..ff6918c
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@link RuntimeException} wrapper indicating the exception was thrown from the Kinesis Data
+ * Firehose Sink.
+ */
+@PublicEvolving
+class KinesisFirehoseException extends RuntimeException {
+
+    public KinesisFirehoseException(final String message) {
+        super(message);
+    }
+
+    public KinesisFirehoseException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * When the flag {@code failOnError} is set in {@link KinesisFirehoseSinkWriter}, this exception
+     * is raised as soon as any exception occurs when writing to KDF.
+     */
+    static class KinesisFirehoseFailFastException extends KinesisFirehoseException {
+
+        public KinesisFirehoseFailFastException() {
+            super(
+                    "Encountered an exception while persisting records, not retrying due to {failOnError} being set.");
+        }
+
+        public KinesisFirehoseFailFastException(final Throwable cause) {
+            super(
+                    "Encountered an exception while persisting records, not retrying due to {failOnError} being set.",
+                    cause);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
new file mode 100644
index 0000000..50f04d4
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+ * stream using the buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to communicate with the AWS
+ * endpoint.
+ *
+ * <p>Please see the writer implementation in {@link KinesisFirehoseSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
+
+    private final boolean failOnError;
+    private final String deliveryStreamName;
+    private final Properties firehoseClientProperties;
+
+    KinesisFirehoseSink(
+            ElementConverter<InputT, Record> elementConverter,
+            Integer maxBatchSize,
+            Integer maxInFlightRequests,
+            Integer maxBufferedRequests,
+            Long maxBatchSizeInBytes,
+            Long maxTimeInBufferMS,
+            Long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties firehoseClientProperties) {
+        super(
+                elementConverter,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.deliveryStreamName =
+                Preconditions.checkNotNull(
+                        deliveryStreamName,
+                        "The delivery stream name must not be null when initializing the KDF Sink.");
+        Preconditions.checkArgument(
+                !this.deliveryStreamName.isEmpty(),
+                "The delivery stream name must be set when initializing the KDF Sink.");
+        this.failOnError = failOnError;
+        this.firehoseClientProperties = firehoseClientProperties;
+    }
+
+    /**
+     * Create a {@link KinesisFirehoseSinkBuilder} to allow the fluent construction of a new {@code
+     * KinesisFirehoseSink}.
+     *
+     * @param <InputT> type of incoming records
+     * @return {@link KinesisFirehoseSinkBuilder}
+     */
+    public static <InputT> KinesisFirehoseSinkBuilder<InputT> builder() {
+        return new KinesisFirehoseSinkBuilder<>();
+    }
+
+    @Override
+    public SinkWriter<InputT, Void, Collection<Record>> createWriter(
+            InitContext context, List<Collection<Record>> states) {
+        return new KinesisFirehoseSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                deliveryStreamName,
+                firehoseClientProperties);
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Collection<Record>>> getWriterStateSerializer() {
+        return Optional.empty();
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
new file mode 100644
index 0000000..ee22e1f
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link KinesisFirehoseSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link KinesisFirehoseSink} that
+ * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name.
+ *
+ * <pre>{@code
+ * private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
+ *         KinesisFirehoseSinkElementConverter.<String>builder()
+ *                 .setSerializationSchema(new SimpleStringSchema())
+ *                 .build();
+ *
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * KinesisFirehoseSink<String> kdfSink =
+ *         KinesisFirehoseSink.<String>builder()
+ *                 .setElementConverter(elementConverter)
+ *                 .setDeliveryStreamName("delivery-stream-name")
+ *                 .setMaxBatchSize(20)
+ *                 .setFirehoseClientProperties(sinkProperties)
+ *                 .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 500
+ *   <li>{@code maxInFlightRequests} will be 50
+ *   <li>{@code maxBufferedRequests} will be 10000
+ *   <li>{@code maxBatchSizeInBytes} will be 4 MB i.e. {@code 4 * 1024 * 1024}
+ *   <li>{@code maxTimeInBufferMS} will be 5000ms
+ *   <li>{@code maxRecordSizeInBytes} will be 1000 KB i.e. {@code 1000 * 1024}
+ *   <li>{@code failOnError} will be false
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class KinesisFirehoseSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<InputT, Record, KinesisFirehoseSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 500;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024;
+    private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+    private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+    private Boolean failOnError;
+    private String deliveryStreamName;
+    private Properties firehoseClientProperties;
+
+    KinesisFirehoseSinkBuilder() {}
+
+    /**
+     * Sets the name of the KDF delivery stream that the sink will connect to. There is no default
+     * for this parameter, therefore, this must be provided at sink creation time otherwise the
+     * build will fail.
+     *
+     * @param deliveryStreamName the name of the delivery stream
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setDeliveryStreamName(String deliveryStreamName) {
+        this.deliveryStreamName = deliveryStreamName;
+        return this;
+    }
+
+    /**
+     * If writing to Kinesis Data Firehose results in a partial or full failure being returned, the
+     * job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set.
+     *
+     * @param failOnError whether to fail on error
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setFailOnError(boolean failOnError) {
+        this.failOnError = failOnError;
+        return this;
+    }
+
+    /**
+     * A set of properties used by the sink to create the firehose client. This may be used to set
+     * the aws region, credentials etc. See the docs for usage and syntax.
+     *
+     * @param firehoseClientProperties Firehose client properties
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setFirehoseClientProperties(
+            Properties firehoseClientProperties) {
+        this.firehoseClientProperties = firehoseClientProperties;
+        return this;
+    }
+
+    private Properties getClientPropertiesWithDefaultHttpProtocol() {
+        Properties clientProperties =
+                Optional.ofNullable(firehoseClientProperties).orElse(new Properties());
+        clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL);
+        return clientProperties;
+    }
+
+    @Override
+    public KinesisFirehoseSink<InputT> build() {
+        return new KinesisFirehoseSink<>(
+                getElementConverter(),
+                Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+                Optional.ofNullable(getMaxInFlightRequests())
+                        .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+                Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+                Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+                Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+                Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+                Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
+                deliveryStreamName,
+                getClientPropertiesWithDefaultHttpProtocol());
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
new file mode 100644
index 0000000..45b4186
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS Kinesis SDK v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a
+ * {@link Record} that may be persisted.
+ */
+@PublicEvolving
+public class KinesisFirehoseSinkElementConverter<InputT>
+        implements ElementConverter<InputT, Record> {
+
+    /** A serialization schema to specify how the input element should be serialized. */
+    private final SerializationSchema<InputT> serializationSchema;
+
+    private KinesisFirehoseSinkElementConverter(SerializationSchema<InputT> serializationSchema) {
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public Record apply(InputT element, SinkWriter.Context context) {
+        return Record.builder()
+                .data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
+                .build();
+    }
+
+    public static <InputT> Builder<InputT> builder() {
+        return new Builder<>();
+    }
+
+    /** A builder for the KinesisFirehoseSinkElementConverter. */
+    @PublicEvolving
+    public static class Builder<InputT> {
+
+        private SerializationSchema<InputT> serializationSchema;
+
+        public Builder<InputT> setSerializationSchema(
+                SerializationSchema<InputT> serializationSchema) {
+            this.serializationSchema = serializationSchema;
+            return this;
+        }
+
+        public KinesisFirehoseSinkElementConverter<InputT> build() {
+            Preconditions.checkNotNull(
+                    serializationSchema,
+                    "No SerializationSchema was supplied to the "
+                            + "KinesisFirehoseSinkElementConverter builder.");
+            return new KinesisFirehoseSinkElementConverter<>(serializationSchema);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
similarity index 54%
copy from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
copy to flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index b720e5e..4002ec3 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -15,27 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.sink;
+package org.apache.flink.connector.firehose.sink;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
-import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -44,35 +46,36 @@ import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
 
 /**
- * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis Data Streams. More
+ * Sink writer created by {@link KinesisFirehoseSink} to write to Kinesis Data Firehose. More
  * details on the operation of this sink writer may be found in the doc for {@link
- * KinesisDataStreamsSink}. More details on the internals of this sink writer may be found in {@link
+ * KinesisFirehoseSink}. More details on the internals of this sink writer may be found in {@link
  * AsyncSinkWriter}.
  *
- * <p>The {@link KinesisAsyncClient} used here may be configured in the standard way for the AWS SDK
- * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
  * AWS_SECRET_ACCESS_KEY} through environment variables etc.
  */
-class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
-    private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
+@Internal
+class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkWriter.class);
 
     /* A counter for the total number of records that have encountered an error during put */
     private final Counter numRecordsOutErrorsCounter;
 
-    /* Name of the stream in Kinesis Data Streams */
-    private final String streamName;
+    /* Name of the delivery stream in Kinesis Data Firehose */
+    private final String deliveryStreamName;
 
     /* The sink writer metric group */
     private final SinkWriterMetricGroup metrics;
 
-    /* The asynchronous Kinesis client - construction is by kinesisClientProperties */
-    private final KinesisAsyncClient client;
+    /* The asynchronous Firehose client - construction is by firehoseClientProperties */
+    private final FirehoseAsyncClient client;
 
     /* Flag to whether fatally fail any time we encounter an exception when persisting records */
     private final boolean failOnError;
 
-    KinesisDataStreamsSinkWriter(
-            ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
+    KinesisFirehoseSinkWriter(
+            ElementConverter<InputT, Record> elementConverter,
             Sink.InitContext context,
             int maxBatchSize,
             int maxInFlightRequests,
@@ -81,8 +84,8 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
             boolean failOnError,
-            String streamName,
-            Properties kinesisClientProperties) {
+            String deliveryStreamName,
+            Properties firehoseClientProperties) {
         super(
                 elementConverter,
                 context,
@@ -93,38 +96,43 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
                 maxTimeInBufferMS,
                 maxRecordSizeInBytes);
         this.failOnError = failOnError;
-        this.streamName = streamName;
+        this.deliveryStreamName = deliveryStreamName;
         this.metrics = context.metricGroup();
         this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
-        this.client = buildClient(kinesisClientProperties);
+        this.client = buildClient(firehoseClientProperties);
     }
 
-    private KinesisAsyncClient buildClient(Properties kinesisClientProperties) {
-
+    private FirehoseAsyncClient buildClient(Properties firehoseClientProperties) {
         final SdkAsyncHttpClient httpClient =
-                AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
-
-        return AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
-                kinesisClientProperties, httpClient);
+                AWSGeneralUtil.createAsyncHttpClient(firehoseClientProperties);
+
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                firehoseClientProperties,
+                httpClient,
+                FirehoseAsyncClient.builder(),
+                KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT,
+                KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
     }
 
     @Override
     protected void submitRequestEntries(
-            List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
+            List<Record> requestEntries, Consumer<Collection<Record>> requestResult) {
 
-        PutRecordsRequest batchRequest =
-                PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build();
+        PutRecordBatchRequest batchRequest =
+                PutRecordBatchRequest.builder()
+                        .records(requestEntries)
+                        .deliveryStreamName(deliveryStreamName)
+                        .build();
 
-        LOG.trace("Request to submit {} entries to KDS using KDS Sink.", requestEntries.size());
+        LOG.trace("Request to submit {} entries to KDF using KDF Sink.", requestEntries.size());
 
-        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);
+        CompletableFuture<PutRecordBatchResponse> future = client.putRecordBatch(batchRequest);
 
         future.whenComplete(
                 (response, err) -> {
                     if (err != null) {
                         handleFullyFailedRequest(err, requestEntries, requestResult);
-                    } else if (response.failedRecordCount() > 0) {
+                    } else if (response.failedPutCount() > 0) {
                         handlePartiallyFailedRequest(response, requestEntries, requestResult);
                     } else {
                         requestResult.accept(Collections.emptyList());
@@ -133,15 +141,19 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
     }
 
     @Override
-    protected long getSizeInBytes(PutRecordsRequestEntry requestEntry) {
+    protected long getSizeInBytes(Record requestEntry) {
         return requestEntry.data().asByteArrayUnsafe().length;
     }
 
     private void handleFullyFailedRequest(
             Throwable err,
-            List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
-        LOG.warn("KDS Sink failed to persist {} entries to KDS", requestEntries.size(), err);
+            List<Record> requestEntries,
+            Consumer<Collection<Record>> requestResult) {
+        LOG.warn(
+                "KDF Sink failed to persist {} entries to KDF first request was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString(),
+                err);
         numRecordsOutErrorsCounter.inc(requestEntries.size());
 
         if (isRetryable(err)) {
@@ -150,20 +162,22 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
     }
 
     private void handlePartiallyFailedRequest(
-            PutRecordsResponse response,
-            List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
-        LOG.warn("KDS Sink failed to persist {} entries to KDS", response.failedRecordCount());
-        numRecordsOutErrorsCounter.inc(response.failedRecordCount());
+            PutRecordBatchResponse response,
+            List<Record> requestEntries,
+            Consumer<Collection<Record>> requestResult) {
+        LOG.warn(
+                "KDF Sink failed to persist {} entries to KDF first request was {}",
+                requestEntries.size(),
+                requestEntries.get(0).toString());
+        numRecordsOutErrorsCounter.inc(response.failedPutCount());
 
         if (failOnError) {
             getFatalExceptionCons()
-                    .accept(new KinesisDataStreamsException.KinesisDataStreamsFailFastException());
+                    .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException());
             return;
         }
-        List<PutRecordsRequestEntry> failedRequestEntries =
-                new ArrayList<>(response.failedRecordCount());
-        List<PutRecordsResultEntry> records = response.records();
+        List<Record> failedRequestEntries = new ArrayList<>(response.failedPutCount());
+        List<PutRecordBatchResponseEntry> records = response.requestResponses();
 
         for (int i = 0; i < records.size(); i++) {
             if (records.get(i).errorCode() != null) {
@@ -179,15 +193,13 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
                 && err.getCause() instanceof ResourceNotFoundException) {
             getFatalExceptionCons()
                     .accept(
-                            new KinesisDataStreamsException(
+                            new KinesisFirehoseException(
                                     "Encountered non-recoverable exception", err));
             return false;
         }
         if (failOnError) {
             getFatalExceptionCons()
-                    .accept(
-                            new KinesisDataStreamsException.KinesisDataStreamsFailFastException(
-                                    err));
+                    .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err));
             return false;
         }
 
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
new file mode 100644
index 0000000..0dc85da
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
+public class KinesisFirehoseSinkBuilderTest {
+    private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    public void elementConverterOfSinkMustBeSetWhenBuilt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.builder()
+                                        .setDeliveryStreamName("deliveryStream")
+                                        .build())
+                .withMessageContaining(
+                        "ElementConverter must be not null when initializing the AsyncSinkBase.");
+    }
+
+    @Test
+    public void streamNameOfSinkMustBeSetWhenBuilt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.<String>builder()
+                                        .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .build())
+                .withMessageContaining(
+                        "The delivery stream name must not be null when initializing the KDF Sink.");
+    }
+
+    @Test
+    public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                KinesisFirehoseSink.<String>builder()
+                                        .setDeliveryStreamName("")
+                                        .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .build())
+                .withMessageContaining(
+                        "The delivery stream name must be set when initializing the KDF Sink.");
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
new file mode 100644
index 0000000..ed0b1c7
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link KinesisFirehoseSinkElementConverter}. */
+public class KinesisFirehoseSinkElementConverterTest {
+
+    @Test
+    public void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build())
+                .withMessageContaining(
+                        "No SerializationSchema was supplied to the KinesisFirehoseSinkElementConverter builder.");
+    }
+
+    @Test
+    public void elementConverterUsesProvidedSchemaToSerializeRecord() {
+        ElementConverter<String, Record> elementConverter =
+                KinesisFirehoseSinkElementConverter.<String>builder()
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .build();
+
+        String testString = "{many hands make light work;";
+
+        Record serializedRecord = elementConverter.apply(testString, null);
+        byte[] serializedString = (new SimpleStringSchema()).serialize(testString);
+        assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString));
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
new file mode 100644
index 0000000..08cb49b
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.iam.IamAsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.List;
+
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getConfig;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getIamClient;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getS3Client;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
+public class KinesisFirehoseSinkITCase {
+
+    private static final ElementConverter<String, Record> elementConverter =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
+    private S3AsyncClient s3AsyncClient;
+    private FirehoseAsyncClient firehoseAsyncClient;
+    private IamAsyncClient iamAsyncClient;
+
+    private static final String ROLE_NAME = "super-role";
+    private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME;
+    private static final String BUCKET_NAME = "s3-firehose";
+    private static final String STREAM_NAME = "s3-stream";
+    private static final int NUMBER_OF_ELEMENTS = 92;
+
+    @ClassRule
+    public static LocalstackContainer mockFirehoseContainer =
+            new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
+
+    @Before
+    public void setup() throws Exception {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+        s3AsyncClient = getS3Client(mockFirehoseContainer.getEndpoint());
+        firehoseAsyncClient = getFirehoseClient(mockFirehoseContainer.getEndpoint());
+        iamAsyncClient = getIamClient(mockFirehoseContainer.getEndpoint());
+    }
+
+    @After
+    public void teardown() {
+        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+    }
+
+    @Test
+    public void test() throws Exception {
+        LOG.info("1 - Creating the bucket for Firehose to deliver into...");
+        createBucket(s3AsyncClient, BUCKET_NAME);
+        LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket...");
+        createIAMRole(iamAsyncClient, ROLE_NAME);
+        LOG.info("3 - Creating the Firehose delivery stream...");
+        createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, firehoseAsyncClient);
+
+        ObjectMapper mapper = new ObjectMapper();
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<String> generator =
+                env.fromSequence(1, NUMBER_OF_ELEMENTS)
+                        .map(Object::toString)
+                        .returns(String.class)
+                        .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+
+        KinesisFirehoseSink<String> kdsSink =
+                KinesisFirehoseSink.<String>builder()
+                        .setElementConverter(elementConverter)
+                        .setDeliveryStreamName(STREAM_NAME)
+                        .setMaxBatchSize(1)
+                        .setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint()))
+                        .build();
+
+        generator.sinkTo(kdsSink);
+        env.execute("Integration Test");
+
+        List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME);
+        assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
new file mode 100644
index 0000000..164ec39
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Properties;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSink}. */
+public class KinesisFirehoseSinkTest {
+
+    private static final ElementConverter<String, Record> elementConverter =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Test
+    public void deliveryStreamNameMustNotBeNull() {
+        Assertions.assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisFirehoseSink<>(
+                                        elementConverter,
+                                        500,
+                                        16,
+                                        10000,
+                                        4 * 1024 * 1024L,
+                                        5000L,
+                                        1000 * 1024L,
+                                        false,
+                                        null,
+                                        new Properties()))
+                .withMessageContaining(
+                        "The delivery stream name must not be null when initializing the KDF Sink.");
+    }
+
+    @Test
+    public void deliveryStreamNameMustNotBeEmpty() {
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new KinesisFirehoseSink<>(
+                                        elementConverter,
+                                        500,
+                                        16,
+                                        10000,
+                                        4 * 1024 * 1024L,
+                                        5000L,
+                                        1000 * 1024L,
+                                        false,
+                                        "",
+                                        new Properties()))
+                .withMessageContaining(
+                        "The delivery stream name must be set when initializing the KDF Sink.");
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
new file mode 100644
index 0000000..d840033
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+
+import org.junit.Before;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkWriter}. */
+public class KinesisFirehoseSinkWriterTest {
+
+    private KinesisFirehoseSinkWriter<String> sinkWriter;
+
+    private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    @Before
+    public void setup() {
+        TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+        Properties sinkProperties = new Properties();
+        sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+        sinkWriter =
+                new KinesisFirehoseSinkWriter<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        sinkInitContext,
+                        50,
+                        16,
+                        10000,
+                        4 * 1024 * 1024,
+                        5000,
+                        1000 * 1024,
+                        true,
+                        "streamName",
+                        sinkProperties);
+    }
+
+    @Test
+    public void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
+        String testString = "{many hands make light work;";
+        Record record = Record.builder().data(SdkBytes.fromUtf8String(testString)).build();
+        assertThat(sinkWriter.getSizeInBytes(record))
+                .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
+    }
+
+    @Test
+    public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
+            throws IOException, InterruptedException {
+        Properties prop = new Properties();
+        prop.setProperty(AWSConfigConstants.AWS_REGION, Region.EU_WEST_1.toString());
+        prop.setProperty(AWS_ENDPOINT, "https://fake_aws_endpoint");
+        TestSinkInitContext ctx = new TestSinkInitContext();
+        KinesisFirehoseSink<String> kinesisFirehoseSink =
+                new KinesisFirehoseSink<>(
+                        ELEMENT_CONVERTER_PLACEHOLDER,
+                        6,
+                        16,
+                        10000,
+                        4 * 1024 * 1024L,
+                        5000L,
+                        1000 * 1024L,
+                        true,
+                        "test-stream",
+                        prop);
+        SinkWriter<String, Void, Collection<Record>> writer =
+                kinesisFirehoseSink.createWriter(ctx, new ArrayList<>());
+
+        for (int i = 0; i < 12; i++) {
+            writer.write("data_bytes", null);
+        }
+
+        assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
new file mode 100644
index 0000000..3e9d0ee
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.examples;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.Properties;
+
+/**
+ * An example application demonstrating how to use the {@link KinesisFirehoseSink} to sink into KDF.
+ *
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY}
+ * through environment variables etc.
+ */
+public class SinkIntoFirehose {
+
+    private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
+            KinesisFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    public static void main(String[] args) throws Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10_000);
+
+        DataStream<String> generator =
+                env.fromSequence(1, 10_000_000L)
+                        .map(Object::toString)
+                        .returns(String.class)
+                        .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+
+        Properties sinkProperties = new Properties();
+        sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+
+        KinesisFirehoseSink<String> kdfSink =
+                KinesisFirehoseSink.<String>builder()
+                        .setElementConverter(elementConverter)
+                        .setDeliveryStreamName("delivery-stream")
+                        .setMaxBatchSize(20)
+                        .setFirehoseClientProperties(sinkProperties)
+                        .build();
+
+        generator.sinkTo(kdfSink);
+
+        env.execute("KDF Async Sink Example Program");
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
new file mode 100644
index 0000000..1de389d
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseConfigConstants;
+
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
+import software.amazon.awssdk.services.firehose.model.ExtendedS3DestinationConfiguration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getConfig;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getHttpClient;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the Localstack container.
+ */
+public class KinesisFirehoseTestUtils {
+
+    public static FirehoseAsyncClient getFirehoseClient(String endpoint) throws URISyntaxException {
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                getConfig(endpoint),
+                getHttpClient(endpoint),
+                FirehoseAsyncClient.builder().endpointOverride(new URI(endpoint)),
+                KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT,
+                KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
+    }
+
+    public static void createDeliveryStream(
+            String deliveryStreamName,
+            String bucketName,
+            String roleARN,
+            FirehoseAsyncClient firehoseAsyncClient)
+            throws ExecutionException, InterruptedException {
+        ExtendedS3DestinationConfiguration s3Config =
+                ExtendedS3DestinationConfiguration.builder()
+                        .bucketARN(bucketName)
+                        .roleARN(roleARN)
+                        .build();
+        CreateDeliveryStreamRequest request =
+                CreateDeliveryStreamRequest.builder()
+                        .deliveryStreamName(deliveryStreamName)
+                        .extendedS3DestinationConfiguration(s3Config)
+                        .deliveryStreamType(DeliveryStreamType.DIRECT_PUT)
+                        .build();
+
+        CompletableFuture<CreateDeliveryStreamResponse> deliveryStream =
+                firehoseAsyncClient.createDeliveryStream(request);
+        deliveryStream.get();
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
similarity index 97%
copy from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
copy to flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
index c5339e7..c4fa187 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = INFO
+rootLogger.level = OFF
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger
diff --git a/flink-connectors/flink-connector-base/pom.xml b/flink-connectors/flink-connector-base/pom.xml
index 751aecd..58fd369 100644
--- a/flink-connectors/flink-connector-base/pom.xml
+++ b/flink-connectors/flink-connector-base/pom.xml
@@ -72,4 +72,20 @@
 			<scope>test</scope>
 		</dependency>
 	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
 </project>
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index b508159..00a08b1 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,22 +17,8 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
-import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
-import org.apache.flink.util.UserCodeClassLoader;
-import org.apache.flink.util.function.RunnableWithException;
-import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -42,10 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
-import java.util.OptionalLong;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,12 +49,12 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class AsyncSinkWriterTest {
 
     private final List<Integer> res = new ArrayList<>();
-    private SinkInitContext sinkInitContext;
+    private TestSinkInitContext sinkInitContext;
 
     @Before
     public void before() {
         res.clear();
-        sinkInitContext = new SinkInitContext();
+        sinkInitContext = new TestSinkInitContext();
     }
 
     private void performNormalWriteOfEightyRecordsToMock()
@@ -1008,105 +991,6 @@ public class AsyncSinkWriterTest {
         }
     }
 
-    private static class SinkInitContext implements Sink.InitContext {
-
-        private static final TestProcessingTimeService processingTimeService;
-        private final MetricListener metricListener = new MetricListener();
-        private final OperatorIOMetricGroup operatorIOMetricGroup =
-                UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
-        private final SinkWriterMetricGroup metricGroup =
-                InternalSinkWriterMetricGroup.mock(
-                        metricListener.getMetricGroup(), operatorIOMetricGroup);
-
-        static {
-            processingTimeService = new TestProcessingTimeService();
-        }
-
-        @Override
-        public UserCodeClassLoader getUserCodeClassLoader() {
-            return null;
-        }
-
-        @Override
-        public MailboxExecutor getMailboxExecutor() {
-            StreamTaskActionExecutor streamTaskActionExecutor =
-                    new StreamTaskActionExecutor() {
-                        @Override
-                        public void run(RunnableWithException e) throws Exception {
-                            e.run();
-                        }
-
-                        @Override
-                        public <E extends Throwable> void runThrowing(
-                                ThrowingRunnable<E> throwingRunnable) throws E {
-                            throwingRunnable.run();
-                        }
-
-                        @Override
-                        public <R> R call(Callable<R> callable) throws Exception {
-                            return callable.call();
-                        }
-                    };
-            return new MailboxExecutorImpl(
-                    new TaskMailboxImpl(Thread.currentThread()),
-                    Integer.MAX_VALUE,
-                    streamTaskActionExecutor);
-        }
-
-        @Override
-        public Sink.ProcessingTimeService getProcessingTimeService() {
-            return new Sink.ProcessingTimeService() {
-                @Override
-                public long getCurrentProcessingTime() {
-                    return processingTimeService.getCurrentProcessingTime();
-                }
-
-                @Override
-                public void registerProcessingTimer(
-                        long time, ProcessingTimeCallback processingTimerCallback) {
-                    processingTimeService.registerTimer(
-                            time, processingTimerCallback::onProcessingTime);
-                }
-            };
-        }
-
-        @Override
-        public int getSubtaskId() {
-            return 0;
-        }
-
-        @Override
-        public int getNumberOfParallelSubtasks() {
-            return 0;
-        }
-
-        @Override
-        public SinkWriterMetricGroup metricGroup() {
-            return metricGroup;
-        }
-
-        @Override
-        public OptionalLong getRestoredCheckpointId() {
-            return OptionalLong.empty();
-        }
-
-        public TestProcessingTimeService getTestProcessingTimeService() {
-            return processingTimeService;
-        }
-
-        private Optional<Gauge<Long>> getCurrentSendTimeGauge() {
-            return metricListener.getGauge("currentSendTime");
-        }
-
-        private Counter getNumRecordsOutCounter() {
-            return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
-        }
-
-        private Counter getNumBytesOutCounter() {
-            return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
-        }
-    }
-
     /**
      * This SinkWriter releases the lock on existing threads blocked by {@code delayedStartLatch}
      * and blocks itself until {@code blockedThreadLatch} is unblocked.
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
new file mode 100644
index 0000000..fba2711
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.Callable;
+
+/** A mock implementation of a {@code Sink.InitContext} to be used in sink unit tests. */
+public class TestSinkInitContext implements Sink.InitContext {
+
+    private static final TestProcessingTimeService processingTimeService;
+    private final MetricListener metricListener = new MetricListener();
+    private final OperatorIOMetricGroup operatorIOMetricGroup =
+            UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+    private final SinkWriterMetricGroup metricGroup =
+            InternalSinkWriterMetricGroup.mock(
+                    metricListener.getMetricGroup(), operatorIOMetricGroup);
+
+    static {
+        processingTimeService = new TestProcessingTimeService();
+    }
+
+    @Override
+    public UserCodeClassLoader getUserCodeClassLoader() {
+        return null;
+    }
+
+    @Override
+    public MailboxExecutor getMailboxExecutor() {
+        StreamTaskActionExecutor streamTaskActionExecutor =
+                new StreamTaskActionExecutor() {
+                    @Override
+                    public void run(RunnableWithException e) throws Exception {
+                        e.run();
+                    }
+
+                    @Override
+                    public <E extends Throwable> void runThrowing(
+                            ThrowingRunnable<E> throwingRunnable) throws E {
+                        throwingRunnable.run();
+                    }
+
+                    @Override
+                    public <R> R call(Callable<R> callable) throws Exception {
+                        return callable.call();
+                    }
+                };
+        return new MailboxExecutorImpl(
+                new TaskMailboxImpl(Thread.currentThread()),
+                Integer.MAX_VALUE,
+                streamTaskActionExecutor);
+    }
+
+    @Override
+    public Sink.ProcessingTimeService getProcessingTimeService() {
+        return new Sink.ProcessingTimeService() {
+            @Override
+            public long getCurrentProcessingTime() {
+                return processingTimeService.getCurrentProcessingTime();
+            }
+
+            @Override
+            public void registerProcessingTimer(
+                    long time, ProcessingTimeCallback processingTimerCallback) {
+                processingTimeService.registerTimer(
+                        time, processingTimerCallback::onProcessingTime);
+            }
+        };
+    }
+
+    @Override
+    public int getSubtaskId() {
+        return 0;
+    }
+
+    @Override
+    public int getNumberOfParallelSubtasks() {
+        return 0;
+    }
+
+    @Override
+    public SinkWriterMetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    @Override
+    public OptionalLong getRestoredCheckpointId() {
+        return OptionalLong.empty();
+    }
+
+    public TestProcessingTimeService getTestProcessingTimeService() {
+        return processingTimeService;
+    }
+
+    public Optional<Gauge<Long>> getCurrentSendTimeGauge() {
+        return metricListener.getGauge("currentSendTime");
+    }
+
+    public Counter getNumRecordsOutCounter() {
+        return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+    }
+
+    public Counter getNumBytesOutCounter() {
+        return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+    }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
index e2e6a79..0677765 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
+import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -190,7 +190,7 @@ public class KinesisProxyV2 implements KinesisProxyV2Interface {
             try {
                 response = responseSupplier.get();
             } catch (Exception ex) {
-                if (AWSKinesisDataStreamsUtil.isRecoverableException(ex)) {
+                if (AwsV2Util.isRecoverableException(ex)) {
                     long backoffMillis =
                             backoff.calculateFullJitterBackoff(
                                     jitterBase, jitterMax, jitterExponent, ++attempt);
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
index 03767cd..62f25db 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
-import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
 import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
 import org.apache.flink.util.Preconditions;
@@ -63,13 +63,17 @@ public class KinesisProxyV2Factory {
 
         Properties legacyConfigProps = new Properties(configProps);
         legacyConfigProps.setProperty(
-                AWSKinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
-                AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix(
-                        AWSKinesisDataStreamsConfigConstants
-                                .BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+                KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
+                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
+                        KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
 
         final KinesisAsyncClient client =
-                AWSKinesisDataStreamsUtil.createKinesisAsyncClient(legacyConfigProps, httpClient);
+                AWSAsyncSinkUtil.createAwsAsyncClient(
+                        legacyConfigProps,
+                        httpClient,
+                        KinesisAsyncClient.builder(),
+                        KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+                        KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
 
         return new KinesisProxyV2(client, httpClient, configuration, BACKOFF);
     }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 5c949fe..684234c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
-import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -83,9 +83,8 @@ public class AWSUtil {
             Properties configProps, ClientConfiguration awsClientConfig) {
         // set a Flink-specific user agent
         awsClientConfig.setUserAgentPrefix(
-                AWSKinesisDataStreamsUtil.formatFlinkUserAgentPrefix(
-                        AWSKinesisDataStreamsConfigConstants
-                                .BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
+                        KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
 
         // utilize automatic refreshment of credentials by directly passing the
         // AWSCredentialsProvider
@@ -133,7 +132,7 @@ public class AWSUtil {
     private static AWSCredentialsProvider getCredentialsProvider(
             final Properties configProps, final String configPrefix) {
         CredentialProvider credentialProviderType =
-                AWSKinesisDataStreamsUtil.getCredentialProviderType(configProps, configPrefix);
+                AWSAsyncSinkUtil.getCredentialProviderType(configProps, configPrefix);
 
         switch (credentialProviderType) {
             case ENV_VAR:
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index 609e879..aa62a65 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 import org.apache.flink.annotation.Internal;
 
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
 import software.amazon.awssdk.utils.AttributeMap;
 
 import java.time.Duration;
@@ -70,4 +72,10 @@ public class AwsV2Util {
     public static boolean isNoneEfoRegistrationType(final Properties properties) {
         return NONE.name().equals(properties.get(EFO_REGISTRATION_TYPE));
     }
+
+    public static boolean isRecoverableException(Exception e) {
+        Throwable cause = e.getCause();
+        return cause instanceof LimitExceededException
+                || cause instanceof ProvisionedThroughputExceededException;
+    }
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
index 47cf23f..ea9f03d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -19,10 +19,12 @@ package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.junit.Test;
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
 import software.amazon.awssdk.utils.AttributeMap;
 
 import java.time.Duration;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.LAZY;
@@ -131,4 +133,36 @@ public class AwsV2UtilTest {
         prop.setProperty(EFO_REGISTRATION_TYPE, NONE.name());
         assertTrue(AwsV2Util.isNoneEfoRegistrationType(prop));
     }
+
+    @Test
+    public void testIsRecoverableExceptionForRecoverable() {
+        Exception recoverable = LimitExceededException.builder().build();
+        assertTrue(AwsV2Util.isRecoverableException(new ExecutionException(recoverable)));
+    }
+
+    @Test
+    public void testIsRecoverableExceptionForNonRecoverable() {
+        Exception nonRecoverable = new IllegalArgumentException("abc");
+        assertFalse(AwsV2Util.isRecoverableException(new ExecutionException(nonRecoverable)));
+    }
+
+    @Test
+    public void testIsRecoverableExceptionForRuntimeExceptionWrappingRecoverable() {
+        Exception recoverable = LimitExceededException.builder().build();
+        Exception runtime = new RuntimeException("abc", recoverable);
+        assertTrue(AwsV2Util.isRecoverableException(runtime));
+    }
+
+    @Test
+    public void testIsRecoverableExceptionForRuntimeExceptionWrappingNonRecoverable() {
+        Exception nonRecoverable = new IllegalArgumentException("abc");
+        Exception runtime = new RuntimeException("abc", nonRecoverable);
+        assertFalse(AwsV2Util.isRecoverableException(runtime));
+    }
+
+    @Test
+    public void testIsRecoverableExceptionForNullCause() {
+        Exception nonRecoverable = new IllegalArgumentException("abc");
+        assertFalse(AwsV2Util.isRecoverableException(nonRecoverable));
+    }
 }
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 7495412..a4c97b7 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -55,6 +55,7 @@ under the License.
 		<module>flink-connector-aws-base</module>
 		<module>flink-connector-kinesis</module>
 		<module>flink-connector-aws-kinesis-data-streams</module>
+		<module>flink-connector-aws-kinesis-firehose</module>
 		<module>flink-connector-base</module>
 		<module>flink-file-sink-common</module>
 		<module>flink-connector-files</module>
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index afc6d9b..99c5293 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -37,6 +37,8 @@ public class DockerImageVersions {
 
     public static final String KINESALITE = "instructure/kinesalite:latest";
 
+    public static final String LOCALSTACK = "localstack/localstack:latest";
+
     public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
     public static final String CASSANDRA_3 = "cassandra:3.0";
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 7793c0b..d70c9ff 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -109,6 +109,7 @@ flink-connectors/flink-connector-rabbitmq,\
 flink-connectors/flink-connector-twitter,\
 flink-connectors/flink-connector-kinesis,\
 flink-connectors/flink-connector-aws-kinesis-data-streams,\
+flink-connectors/flink-connector-aws-kinesis-firehose,\
 flink-metrics/flink-metrics-dropwizard,\
 flink-metrics/flink-metrics-graphite,\
 flink-metrics/flink-metrics-jmx,\