You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/20 16:13:28 UTC

[GitHub] [flink] dannycranmer commented on a change in pull request #18314: [FLINK-24228][connectors/firehose] - Unified Async Sink for Kinesis Firehose

dannycranmer commented on a change in pull request #18314:
URL: https://github.com/apache/flink/pull/18314#discussion_r788834321



##########
File path: flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSUnifiedSinksConfigConstants.java
##########
@@ -15,19 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.kinesis.config;
+package org.apache.flink.connector.aws.config;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;
+import org.apache.flink.connector.aws.util.AWSUnifiedSinksUtil;
 
-/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
+/** Defaults for {@link AWSUnifiedSinksUtil}. */
 @PublicEvolving
-public class AWSKinesisDataStreamsConfigConstants {
+public class AWSUnifiedSinksConfigConstants {
 
     public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT =
             "Apache Flink %s (%s) Kinesis Connector";
 
-    /** Identifier for user agent prefix. */
+    /** Kinesis identifier for user agent prefix. */
     public static final String KINESIS_CLIENT_USER_AGENT_PREFIX =
             "aws.kinesis.client.user-agent-prefix";
+
+    public static final String BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT =
+            "Apache Flink %s (%s) Firehose Connector";
+
+    /** Firehose identifier for user agent prefix. */
+    public static final String FIREHOSE_CLIENT_USER_AGENT_PREFIX =
+            "aws.firehose.client.user-agent-prefix";

Review comment:
       This is a smell to put Kinesis/Firehose concerns in the base module. Can we move them to the respective modules?

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/pom.xml
##########
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.15-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-aws-kinesis-data-firehose</artifactId>
+	<name>Flink : Connectors : AWS Kinesis Data Firehose</name>
+	<properties>
+		<aws.sdk.version>2.17.52</aws.sdk.version>

Review comment:
       Is there a common place to put this so that we inherit it? We need to keep AWS SDK in sync between all connector modules

##########
File path: flink-connectors/flink-connector-aws-base/pom.xml
##########
@@ -57,6 +57,13 @@ under the License.
 			<artifactId>sts</artifactId>
 			<version>${aws.sdk.version}</version>
 		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>kinesis</artifactId>
+			<version>${aws.sdk.version}</version>
+			<scope>test</scope>
+		</dependency>

Review comment:
       Why is this needed? Can we keep the tests general in the base class? Maybe implement a dummy client? 

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/pom.xml
##########
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.15-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-aws-kinesis-data-firehose</artifactId>

Review comment:
       There is no data in firehose :D 
   
   `flink-connector-aws-kinesis-data-firehose` > `flink-connector-aws-kinesis-firehose`

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisDataFirehoseTestUtils.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.config.AWSUnifiedSinksConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.aws.util.AWSUnifiedSinksUtil;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.waiters.WaiterResponse;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
+import software.amazon.awssdk.services.firehose.model.ExtendedS3DestinationConfiguration;
+import software.amazon.awssdk.services.iam.IamAsyncClient;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.waiters.S3AsyncWaiter;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the Localstack container.
+ */
+public class KinesisDataFirehoseTestUtils {
+
+    private static final String ACCESS_KEY_ID = "accessKeyId";
+    private static final String SECRET_ACCESS_KEY = "secretAccessKey";
+
+    public static S3AsyncClient makeS3Client(String endpoint) throws URISyntaxException {
+        return S3AsyncClient.builder()
+                .httpClient(getHttpClient(endpoint))
+                .region(Region.AP_SOUTHEAST_1)
+                .endpointOverride(new URI(endpoint))
+                .credentialsProvider(getDefaultCredentials())
+                .build();
+    }
+
+    public static FirehoseAsyncClient makeFirehoseClient(String endpoint)
+            throws URISyntaxException {
+        return AWSUnifiedSinksUtil.createAwsAsyncClient(
+                getConfig(endpoint),
+                getHttpClient(endpoint),
+                FirehoseAsyncClient.builder().endpointOverride(new URI(endpoint)),
+                AWSUnifiedSinksConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+                AWSUnifiedSinksConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);

Review comment:
       Same as above

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseSink.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+ * stream using the buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to communicate with the AWS
+ * endpoint.
+ *
+ * <p>The behaviour of the buffering may be specified by providing configuration during the sink
+ * build time.
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize}: the maximum size of a batch of entries that may be sent to KDF
+ *   <li>{@code maxInFlightRequests}: the maximum number of in flight requests that may exist, if
+ *       any more in flight requests need to be initiated once the maximum has been reached, then it
+ *       will be blocked until some have completed
+ *   <li>{@code maxBufferedRequests}: the maximum number of elements held in the buffer, requests to
+ *       add elements will be blocked while the number of elements in the buffer is at the maximum
+ *   <li>{@code maxBatchSizeInBytes}: the maximum size of a batch of entries that may be sent to KDF
+ *       measured in bytes
+ *   <li>{@code maxTimeInBufferMS}: the maximum amount of time an entry is allowed to live in the
+ *       buffer, if any element reaches this age, the entire buffer will be flushed immediately
+ *   <li>{@code maxRecordSizeInBytes}: the maximum size of a record the sink will accept into the
+ *       buffer, a record of size larger than this will be rejected when passed to the sink
+ *   <li>{@code failOnError}: when an exception is encountered while persisting to Kinesis Data
+ *       Firehose, the job will fail immediately if failOnError is set
+ * </ul>
+ *
+ * <p>Please see the writer implementation in {@link KinesisDataFirehoseSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class KinesisDataFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
+
+    private final boolean failOnError;
+    private final String deliveryStreamName;
+    private final Properties kinesisClientProperties;
+
+    KinesisDataFirehoseSink(
+            ElementConverter<InputT, Record> elementConverter,
+            Integer maxBatchSize,
+            Integer maxInFlightRequests,
+            Integer maxBufferedRequests,
+            Long maxBatchSizeInBytes,
+            Long maxTimeInBufferMS,
+            Long maxRecordSizeInBytes,

Review comment:
       Did you consider wrapping these common async sink configs into a class? There are a lot of parameters here

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseSinkWriter.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.aws.config.AWSUnifiedSinksConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.aws.util.AWSUnifiedSinksUtil;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link KinesisDataFirehoseSink} to write to Kinesis Data Firehose. More
+ * details on the operation of this sink writer may be found in the doc for {@link
+ * KinesisDataFirehoseSink}. More details on the internals of this sink writer may be found in
+ * {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+class KinesisDataFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFirehoseSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* Name of the delivery stream in Kinesis Data Firehose */
+    private final String deliveryStreamName;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    /* The asynchronous Kinesis client - construction is by kinesisClientProperties */
+    private final FirehoseAsyncClient client;
+
+    /* Flag to whether fatally fail any time we encounter an exception when persisting records */
+    private final boolean failOnError;
+
+    KinesisDataFirehoseSinkWriter(
+            ElementConverter<InputT, Record> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties kinesisClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.deliveryStreamName = deliveryStreamName;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
+        this.client = buildClient(kinesisClientProperties);
+    }
+
+    private FirehoseAsyncClient buildClient(Properties kinesisClientProperties) {
+        final SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
+
+        return AWSUnifiedSinksUtil.createAwsAsyncClient(
+                kinesisClientProperties,
+                httpClient,
+                FirehoseAsyncClient.builder(),

Review comment:
       Awesome!

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseSink.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+ * stream using the buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to communicate with the AWS
+ * endpoint.
+ *
+ * <p>The behaviour of the buffering may be specified by providing configuration during the sink
+ * build time.
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize}: the maximum size of a batch of entries that may be sent to KDF
+ *   <li>{@code maxInFlightRequests}: the maximum number of in flight requests that may exist, if
+ *       any more in flight requests need to be initiated once the maximum has been reached, then it
+ *       will be blocked until some have completed
+ *   <li>{@code maxBufferedRequests}: the maximum number of elements held in the buffer, requests to
+ *       add elements will be blocked while the number of elements in the buffer is at the maximum
+ *   <li>{@code maxBatchSizeInBytes}: the maximum size of a batch of entries that may be sent to KDF
+ *       measured in bytes
+ *   <li>{@code maxTimeInBufferMS}: the maximum amount of time an entry is allowed to live in the
+ *       buffer, if any element reaches this age, the entire buffer will be flushed immediately
+ *   <li>{@code maxRecordSizeInBytes}: the maximum size of a record the sink will accept into the
+ *       buffer, a record of size larger than this will be rejected when passed to the sink
+ *   <li>{@code failOnError}: when an exception is encountered while persisting to Kinesis Data
+ *       Firehose, the job will fail immediately if failOnError is set
+ * </ul>
+ *
+ * <p>Please see the writer implementation in {@link KinesisDataFirehoseSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class KinesisDataFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
+
+    private final boolean failOnError;
+    private final String deliveryStreamName;
+    private final Properties kinesisClientProperties;
+
+    KinesisDataFirehoseSink(
+            ElementConverter<InputT, Record> elementConverter,
+            Integer maxBatchSize,
+            Integer maxInFlightRequests,
+            Integer maxBufferedRequests,
+            Long maxBatchSizeInBytes,
+            Long maxTimeInBufferMS,
+            Long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties kinesisClientProperties) {
+        super(
+                elementConverter,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.deliveryStreamName =
+                Preconditions.checkNotNull(
+                        deliveryStreamName,
+                        "The delivery stream name must not be null when initializing the KDF Sink.");
+        Preconditions.checkArgument(
+                !this.deliveryStreamName.isEmpty(),
+                "The delivery stream name must be set when initializing the KDF Sink.");
+        this.failOnError = failOnError;
+        this.kinesisClientProperties = kinesisClientProperties;
+    }
+
+    /**
+     * Create a {@link KinesisDataFirehoseSinkBuilder} to allow the fluent construction of a new
+     * {@code KinesisDataFirehoseSink}.
+     *
+     * @param <InputT> type of incoming records
+     * @return {@link KinesisDataFirehoseSinkBuilder}
+     */
+    public static <InputT> KinesisDataFirehoseSinkBuilder<InputT> builder() {
+        return new KinesisDataFirehoseSinkBuilder<>();
+    }
+
+    @Experimental

Review comment:
       Why did you mark as Experimental? When do you plan to remove this?

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseSinkBuilder.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link KinesisDataFirehoseSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link KinesisDataFirehoseSink} that
+ * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name.
+ *
+ * <pre>{@code
+ * private static final ElementConverter<String, Record> elementConverter =

Review comment:
       As with last time, I would rather not expose `Record`, so lets document `KinesisDataFirehoseSinkElementConverter<String>` instead?

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseException.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+/**
+ * A {@link RuntimeException} wrapper indicating the exception was thrown from the Kinesis Data
+ * Firehose Sink.
+ */
+class KinesisDataFirehoseException extends RuntimeException {

Review comment:
       Missing compatibility annotation. Please check all classes in the pr

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseSink.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+ * stream using the buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to communicate with the AWS
+ * endpoint.
+ *
+ * <p>The behaviour of the buffering may be specified by providing configuration during the sink
+ * build time.
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize}: the maximum size of a batch of entries that may be sent to KDF
+ *   <li>{@code maxInFlightRequests}: the maximum number of in flight requests that may exist, if
+ *       any more in flight requests need to be initiated once the maximum has been reached, then it
+ *       will be blocked until some have completed
+ *   <li>{@code maxBufferedRequests}: the maximum number of elements held in the buffer, requests to
+ *       add elements will be blocked while the number of elements in the buffer is at the maximum
+ *   <li>{@code maxBatchSizeInBytes}: the maximum size of a batch of entries that may be sent to KDF
+ *       measured in bytes
+ *   <li>{@code maxTimeInBufferMS}: the maximum amount of time an entry is allowed to live in the
+ *       buffer, if any element reaches this age, the entire buffer will be flushed immediately
+ *   <li>{@code maxRecordSizeInBytes}: the maximum size of a record the sink will accept into the
+ *       buffer, a record of size larger than this will be rejected when passed to the sink
+ *   <li>{@code failOnError}: when an exception is encountered while persisting to Kinesis Data
+ *       Firehose, the job will fail immediately if failOnError is set

Review comment:
       nit: This looks like common comments that are shared between sinks. Maybe we should put this info in one place?

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseSinkITCase.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.firehose.sink.testutils.LocalstackContainer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.iam.IamAsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.List;
+
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.createDeliveryStream;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.createIAMRole;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.createIamClient;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.getConfig;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.listBucketObjects;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.makeBucket;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.makeFirehoseClient;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisDataFirehoseTestUtils.makeS3Client;
+import static org.junit.Assert.assertEquals;
+
+/** Integration test suite for the {@code KinesisDataFirehoseSink} using a localstack container. */
+public class KinesisDataFirehoseSinkITCase {
+
+    private static final ElementConverter<String, Record> elementConverter =
+            KinesisDataFirehoseSinkElementConverter.<String>builder()
+                    .setSerializationSchema(new SimpleStringSchema())
+                    .build();
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFirehoseSinkITCase.class);
+    private S3AsyncClient s3AsyncClient;
+    private FirehoseAsyncClient firehoseAsyncClient;
+    private IamAsyncClient iamAsyncClient;
+
+    private static final String ROLE_NAME = "super-role";
+    private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME;
+    private static final String BUCKET_NAME = "s3-firehose";
+    private static final String STREAM_NAME = "s3-stream";
+    private static final int NUMBER_OF_ELEMENTS = 92;
+
+    @ClassRule
+    public static LocalstackContainer mockFirehoseContainer =
+            new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
+
+    @Before
+    public void setup() throws Exception {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+        s3AsyncClient = makeS3Client(mockFirehoseContainer.getEndpoint());

Review comment:
       nit: `get` or `build` are commonly used verbs, `make` is not standard

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisDataFirehoseSinkWriter.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.aws.config.AWSUnifiedSinksConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.aws.util.AWSUnifiedSinksUtil;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link KinesisDataFirehoseSink} to write to Kinesis Data Firehose. More
+ * details on the operation of this sink writer may be found in the doc for {@link
+ * KinesisDataFirehoseSink}. More details on the internals of this sink writer may be found in
+ * {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+class KinesisDataFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFirehoseSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* Name of the delivery stream in Kinesis Data Firehose */
+    private final String deliveryStreamName;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    /* The asynchronous Kinesis client - construction is by kinesisClientProperties */
+    private final FirehoseAsyncClient client;
+
+    /* Flag to whether fatally fail any time we encounter an exception when persisting records */
+    private final boolean failOnError;
+
+    KinesisDataFirehoseSinkWriter(
+            ElementConverter<InputT, Record> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            String deliveryStreamName,
+            Properties kinesisClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.deliveryStreamName = deliveryStreamName;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
+        this.client = buildClient(kinesisClientProperties);
+    }
+
+    private FirehoseAsyncClient buildClient(Properties kinesisClientProperties) {
+        final SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
+
+        return AWSUnifiedSinksUtil.createAwsAsyncClient(
+                kinesisClientProperties,
+                httpClient,
+                FirehoseAsyncClient.builder(),
+                AWSUnifiedSinksConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+                AWSUnifiedSinksConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);

Review comment:
       Should these be Firehose? If not, the are probably named wrong

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/LocalstackContainer.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A class wrapping the Localstack container that provides mock implementations of many common AWS
+ * services.
+ */
+public class LocalstackContainer extends GenericContainer<LocalstackContainer> {
+
+    private static final int CONTAINER_PORT = 4566;
+
+    public LocalstackContainer(DockerImageName imageName) {
+        super(imageName);
+        withExposedPorts(CONTAINER_PORT);
+        waitingFor(new ListStreamsWaitStrategy());
+    }
+
+    public String getEndpoint() {
+        return String.format("https://%s:%s", getHost(), getMappedPort(CONTAINER_PORT));
+    }
+
+    private class ListStreamsWaitStrategy extends AbstractWaitStrategy {
+        private static final int TRANSACTIONS_PER_SECOND = 1;
+
+        private final RateLimiter rateLimiter =
+                RateLimiterBuilder.newBuilder()
+                        .withRate(TRANSACTIONS_PER_SECOND, SECONDS)
+                        .withConstantThroughput()
+                        .build();
+
+        @Override
+        protected void waitUntilReady() {
+            try {
+                Thread.sleep(10000);

Review comment:
       Is this required for this test? If so, maybe the issue was not with Kinesalite after all on the Kinesis tests

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/pom.xml
##########
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.15-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-aws-kinesis-data-firehose</artifactId>
+	<name>Flink : Connectors : AWS Kinesis Data Firehose</name>
+	<properties>
+		<aws.sdk.version>2.17.52</aws.sdk.version>
+	</properties>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-aws-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>sdk-core</artifactId>
+			<version>${aws.sdk.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>kinesis</artifactId>
+			<version>${aws.sdk.version}</version>
+		</dependency>

Review comment:
       Why do we need to depend on Kinesis?

##########
File path: flink-connectors/flink-connector-aws-kinesis-data-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/LocalstackContainer.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A class wrapping the Localstack container that provides mock implementations of many common AWS
+ * services.
+ */
+public class LocalstackContainer extends GenericContainer<LocalstackContainer> {
+
+    private static final int CONTAINER_PORT = 4566;
+
+    public LocalstackContainer(DockerImageName imageName) {
+        super(imageName);
+        withExposedPorts(CONTAINER_PORT);
+        //        setPortBindings(
+        //                Collections.singletonList(String.format("%s:%s",
+        // getMappedPort(CONTAINER_PORT), CONTAINER_PORT)));
+        waitingFor(new ListStreamsWaitStrategy());
+    }
+
+    public String getEndpoint() {
+        return String.format("https://%s:%s", getHost(), getMappedPort(CONTAINER_PORT));
+    }
+
+    private class ListStreamsWaitStrategy extends AbstractWaitStrategy {

Review comment:
       This is actually a `listBuckets` wait strategy?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org