You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/02/01 15:35:51 UTC

[flink] 03/03: [FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9050fd56591cbcb3270dd40ea370ea605bda8d39
Author: Zichen Liu <zi...@amazon.com>
AuthorDate: Mon Jan 24 18:55:17 2022 +0000

    [FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase
---
 .../docs/connectors/datastream/firehose.md         | 56 +++++++---------
 .../content/docs/connectors/datastream/firehose.md | 56 +++++++---------
 .../aws/testutils/AWSServicesTestUtils.java        | 44 +++++++++++++
 .../firehose/sink/KinesisFirehoseSinkBuilder.java  | 27 ++++++--
 .../sink/KinesisFirehoseSinkElementConverter.java  |  8 +--
 .../firehose/sink/KinesisFirehoseSinkWriter.java   |  9 +--
 .../sink/KinesisFirehoseSinkBuilderTest.java       | 16 ++---
 .../KinesisFirehoseSinkElementConverterTest.java   |  2 +-
 .../firehose/sink/KinesisFirehoseSinkITCase.java   | 31 +++++----
 .../firehose/sink/examples/SinkIntoFirehose.java   | 74 ----------------------
 10 files changed, 143 insertions(+), 180 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md b/docs/content.zh/docs/connectors/datastream/firehose.md
index ecc77ce..e958a7c 100644
--- a/docs/content.zh/docs/connectors/datastream/firehose.md
+++ b/docs/content.zh/docs/connectors/datastream/firehose.md
@@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com
 {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
 {{< tab "Java" >}}
 ```java
-KinesisFirehoseSinkElementConverter<String> elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build();
-
 Properties sinkProperties = new Properties();
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
@@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 KinesisFirehoseSink<String> kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build();
 
 flinkStream.sinkTo(kdfSink);
@@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink);
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build()
-
 Properties sinkProperties = new Properties()
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
@@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 val kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build()
 
 flinkStream.sinkTo(kdfSink)
@@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink)
 
 ## Configurations
 
-Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
 
 1. __setFirehoseClientProperties(Properties sinkProperties)__
     * Required.
     * Supplies credentials, region and other parameters to the Firehose client.
-2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
     * Required.
-    * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+    * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
 3. __setDeliveryStreamName(String deliveryStreamName)__
     * Required.
     * Name of the delivery stream to sink to.
diff --git a/docs/content/docs/connectors/datastream/firehose.md b/docs/content/docs/connectors/datastream/firehose.md
index b224665..20beb61 100644
--- a/docs/content/docs/connectors/datastream/firehose.md
+++ b/docs/content/docs/connectors/datastream/firehose.md
@@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com
 {{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
 {{< tab "Java" >}}
 ```java
-KinesisFirehoseSinkElementConverter<String> elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build();
-
 Properties sinkProperties = new Properties();
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
@@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 KinesisFirehoseSink<String> kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build();
 
 flinkStream.sinkTo(kdfSink);
@@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink);
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val elementConverter =
-    KinesisFirehoseSinkElementConverter.<String>builder()
-        .setSerializationSchema(new SimpleStringSchema())
-        .build()
-
 Properties sinkProperties = new Properties()
 // Required
 sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
@@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
 
 val kdfSink =
     KinesisFirehoseSink.<String>builder()
-        .setFirehoseClientProperties(sinkProperties)    // Required
-        .setElementConverter(elementConverter)         // Required
-        .setDeliveryStreamName("your-stream-name")     // Required
-        .setFailOnError(false)                         // Optional
-        .setMaxBatchSize(500)                          // Optional
-        .setMaxInFlightRequests(50)                    // Optional
-        .setMaxBufferedRequests(10_000)                // Optional
-        .setMaxBatchSizeInBytes(4 * 1024 * 1024)       // Optional
-        .setMaxTimeInBufferMS(5000)                    // Optional
-        .setMaxRecordSizeInBytes(1000 * 1024)      // Optional
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
         .build()
 
 flinkStream.sinkTo(kdfSink)
@@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink)
 
 ## Configurations
 
-Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
 
 1. __setFirehoseClientProperties(Properties sinkProperties)__
     * Required.
     * Supplies credentials, region and other parameters to the Firehose client.
-2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
     * Required.
-    * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+    * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
 3. __setDeliveryStreamName(String deliveryStreamName)__
     * Required.
     * Name of the delivery stream to sink to.
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 543d81b..ff915fa 100644
--- a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
 import software.amazon.awssdk.core.waiters.WaiterResponse;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
@@ -31,6 +33,8 @@ import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
 import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
 import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
@@ -44,6 +48,8 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
@@ -126,4 +132,42 @@ public class AWSServicesTestUtils {
         CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects);
         return res.get().contents();
     }
+
+    public static <T> List<T> readObjectsFromS3Bucket(
+            S3AsyncClient s3AsyncClient,
+            List<S3Object> objects,
+            String bucketName,
+            Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+        S3BucketReader bucketReader = new S3BucketReader(s3AsyncClient, bucketName);
+        return bucketReader.readObjects(objects, deserializer);
+    }
+
+    /** Helper class to read objects from S3. */
+    private static class S3BucketReader {
+        private final S3AsyncClient s3AsyncClient;
+        private final String bucketName;
+
+        public S3BucketReader(S3AsyncClient s3AsyncClient, String bucketName) {
+            this.s3AsyncClient = s3AsyncClient;
+            this.bucketName = bucketName;
+        }
+
+        public <T> List<T> readObjects(
+                List<S3Object> objectList,
+                Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+            return objectList.stream()
+                    .map(object -> readObjectWitKey(object.key(), deserializer))
+                    .collect(Collectors.toList());
+        }
+
+        public <T> T readObjectWitKey(
+                String key, Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+            GetObjectRequest getObjectRequest =
+                    GetObjectRequest.builder().bucket(bucketName).key(key).build();
+            return s3AsyncClient
+                    .getObject(getObjectRequest, AsyncResponseTransformer.toBytes())
+                    .thenApply(deserializer)
+                    .join();
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
index ee22e1f..a180abc 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
 
 import software.amazon.awssdk.http.Protocol;
@@ -36,11 +37,6 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1;
  * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name.
  *
  * <pre>{@code
- * private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
- *         KinesisFirehoseSinkElementConverter.<String>builder()
- *                 .setSerializationSchema(new SimpleStringSchema())
- *                 .build();
- *
  * Properties sinkProperties = new Properties();
  * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
  *
@@ -50,6 +46,7 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1;
  *                 .setDeliveryStreamName("delivery-stream-name")
  *                 .setMaxBatchSize(20)
  *                 .setFirehoseClientProperties(sinkProperties)
+ *                 .setSerializationSchema(new SimpleStringSchema())
  *                 .build();
  * }</pre>
  *
@@ -73,7 +70,7 @@ public class KinesisFirehoseSinkBuilder<InputT>
 
     private static final int DEFAULT_MAX_BATCH_SIZE = 500;
     private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
-    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
     private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024;
     private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
     private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024;
@@ -83,6 +80,7 @@ public class KinesisFirehoseSinkBuilder<InputT>
     private Boolean failOnError;
     private String deliveryStreamName;
     private Properties firehoseClientProperties;
+    private SerializationSchema<InputT> serializationSchema;
 
     KinesisFirehoseSinkBuilder() {}
 
@@ -100,6 +98,19 @@ public class KinesisFirehoseSinkBuilder<InputT>
     }
 
     /**
+     * Allows the user to specify a serialization schema to serialize each record to persist to
+     * Firehose.
+     *
+     * @param serializationSchema serialization schema to use
+     * @return {@link KinesisFirehoseSinkBuilder} itself
+     */
+    public KinesisFirehoseSinkBuilder<InputT> setSerializationSchema(
+            SerializationSchema<InputT> serializationSchema) {
+        this.serializationSchema = serializationSchema;
+        return this;
+    }
+
+    /**
      * If writing to Kinesis Data Firehose results in a partial or full failure being returned, the
      * job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set.
      *
@@ -134,7 +145,9 @@ public class KinesisFirehoseSinkBuilder<InputT>
     @Override
     public KinesisFirehoseSink<InputT> build() {
         return new KinesisFirehoseSink<>(
-                getElementConverter(),
+                KinesisFirehoseSinkElementConverter.<InputT>builder()
+                        .setSerializationSchema(serializationSchema)
+                        .build(),
                 Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
                 Optional.ofNullable(getMaxInFlightRequests())
                         .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index 45b4186..cca749c 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.firehose.sink;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -31,7 +31,7 @@ import software.amazon.awssdk.services.firehose.model.Record;
  * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a
  * {@link Record} that may be persisted.
  */
-@PublicEvolving
+@Internal
 public class KinesisFirehoseSinkElementConverter<InputT>
         implements ElementConverter<InputT, Record> {
 
@@ -54,7 +54,6 @@ public class KinesisFirehoseSinkElementConverter<InputT>
     }
 
     /** A builder for the KinesisFirehoseSinkElementConverter. */
-    @PublicEvolving
     public static class Builder<InputT> {
 
         private SerializationSchema<InputT> serializationSchema;
@@ -68,8 +67,7 @@ public class KinesisFirehoseSinkElementConverter<InputT>
         public KinesisFirehoseSinkElementConverter<InputT> build() {
             Preconditions.checkNotNull(
                     serializationSchema,
-                    "No SerializationSchema was supplied to the "
-                            + "KinesisFirehoseSinkElementConverter builder.");
+                    "No SerializationSchema was supplied to the " + "KinesisFirehoseSink builder.");
             return new KinesisFirehoseSinkElementConverter<>(serializationSchema);
         }
     }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 4002ec3..b41103f 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -37,7 +37,6 @@ import software.amazon.awssdk.services.firehose.model.Record;
 import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -116,7 +115,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
 
     @Override
     protected void submitRequestEntries(
-            List<Record> requestEntries, Consumer<Collection<Record>> requestResult) {
+            List<Record> requestEntries, Consumer<List<Record>> requestResult) {
 
         PutRecordBatchRequest batchRequest =
                 PutRecordBatchRequest.builder()
@@ -146,9 +145,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
     }
 
     private void handleFullyFailedRequest(
-            Throwable err,
-            List<Record> requestEntries,
-            Consumer<Collection<Record>> requestResult) {
+            Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
         LOG.warn(
                 "KDF Sink failed to persist {} entries to KDF first request was {}",
                 requestEntries.size(),
@@ -164,7 +161,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
     private void handlePartiallyFailedRequest(
             PutRecordBatchResponse response,
             List<Record> requestEntries,
-            Consumer<Collection<Record>> requestResult) {
+            Consumer<List<Record>> requestResult) {
         LOG.warn(
                 "KDF Sink failed to persist {} entries to KDF first request was {}",
                 requestEntries.size(),
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
index 0dc85da..88f4329 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -17,19 +17,17 @@
 
 package org.apache.flink.connector.firehose.sink;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
-import software.amazon.awssdk.services.firehose.model.Record;
 
 /** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
 public class KinesisFirehoseSinkBuilderTest {
-    private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER =
-            KinesisFirehoseSinkElementConverter.<String>builder()
-                    .setSerializationSchema(new SimpleStringSchema())
-                    .build();
+
+    private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
+            new SimpleStringSchema();
 
     @Test
     public void elementConverterOfSinkMustBeSetWhenBuilt() {
@@ -40,7 +38,7 @@ public class KinesisFirehoseSinkBuilderTest {
                                         .setDeliveryStreamName("deliveryStream")
                                         .build())
                 .withMessageContaining(
-                        "ElementConverter must be not null when initializing the AsyncSinkBase.");
+                        "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
     }
 
     @Test
@@ -49,7 +47,7 @@ public class KinesisFirehoseSinkBuilderTest {
                 .isThrownBy(
                         () ->
                                 KinesisFirehoseSink.<String>builder()
-                                        .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .setSerializationSchema(SERIALIZATION_SCHEMA)
                                         .build())
                 .withMessageContaining(
                         "The delivery stream name must not be null when initializing the KDF Sink.");
@@ -62,7 +60,7 @@ public class KinesisFirehoseSinkBuilderTest {
                         () ->
                                 KinesisFirehoseSink.<String>builder()
                                         .setDeliveryStreamName("")
-                                        .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+                                        .setSerializationSchema(SERIALIZATION_SCHEMA)
                                         .build())
                 .withMessageContaining(
                         "The delivery stream name must be set when initializing the KDF Sink.");
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
index ed0b1c7..221f444 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -35,7 +35,7 @@ public class KinesisFirehoseSinkElementConverterTest {
         Assertions.assertThatExceptionOfType(NullPointerException.class)
                 .isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build())
                 .withMessageContaining(
-                        "No SerializationSchema was supplied to the KinesisFirehoseSinkElementConverter builder.");
+                        "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
     }
 
     @Test
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
index 08cb49b..8809437 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -19,7 +19,6 @@ package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.connector.aws.testutils.LocalstackContainer;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.DockerImageVersions;
@@ -35,12 +34,12 @@ import org.slf4j.LoggerFactory;
 import org.testcontainers.utility.DockerImageName;
 import software.amazon.awssdk.core.SdkSystemSetting;
 import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
-import software.amazon.awssdk.services.firehose.model.Record;
 import software.amazon.awssdk.services.iam.IamAsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.utils.ImmutableMap;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
@@ -49,6 +48,7 @@ import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getC
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getIamClient;
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getS3Client;
 import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket;
 import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
 import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -56,22 +56,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
 public class KinesisFirehoseSinkITCase {
 
-    private static final ElementConverter<String, Record> elementConverter =
-            KinesisFirehoseSinkElementConverter.<String>builder()
-                    .setSerializationSchema(new SimpleStringSchema())
-                    .build();
-
     private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
-    private S3AsyncClient s3AsyncClient;
-    private FirehoseAsyncClient firehoseAsyncClient;
-    private IamAsyncClient iamAsyncClient;
-
     private static final String ROLE_NAME = "super-role";
     private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME;
     private static final String BUCKET_NAME = "s3-firehose";
     private static final String STREAM_NAME = "s3-stream";
     private static final int NUMBER_OF_ELEMENTS = 92;
 
+    private S3AsyncClient s3AsyncClient;
+    private FirehoseAsyncClient firehoseAsyncClient;
+    private IamAsyncClient iamAsyncClient;
+
     @ClassRule
     public static LocalstackContainer mockFirehoseContainer =
             new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
@@ -106,10 +101,15 @@ public class KinesisFirehoseSinkITCase {
                         .map(Object::toString)
                         .returns(String.class)
                         .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+        List<String> expectedElements = new ArrayList<>();
+        for (int i = 1; i < NUMBER_OF_ELEMENTS; i++) {
+            expectedElements.add(
+                    mapper.writeValueAsString(ImmutableMap.of("data", String.valueOf(i))));
+        }
 
         KinesisFirehoseSink<String> kdsSink =
                 KinesisFirehoseSink.<String>builder()
-                        .setElementConverter(elementConverter)
+                        .setSerializationSchema(new SimpleStringSchema())
                         .setDeliveryStreamName(STREAM_NAME)
                         .setMaxBatchSize(1)
                         .setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint()))
@@ -120,5 +120,12 @@ public class KinesisFirehoseSinkITCase {
 
         List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME);
         assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
+        assertThat(
+                        readObjectsFromS3Bucket(
+                                s3AsyncClient,
+                                objects,
+                                BUCKET_NAME,
+                                response -> new String(response.asByteArrayUnsafe())))
+                .containsAll(expectedElements);
     }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
deleted file mode 100644
index 3e9d0ee..0000000
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.firehose.sink.examples;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
-import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-import java.util.Properties;
-
-/**
- * An example application demonstrating how to use the {@link KinesisFirehoseSink} to sink into KDF.
- *
- * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
- * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY}
- * through environment variables etc.
- */
-public class SinkIntoFirehose {
-
-    private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
-            KinesisFirehoseSinkElementConverter.<String>builder()
-                    .setSerializationSchema(new SimpleStringSchema())
-                    .build();
-
-    public static void main(String[] args) throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(10_000);
-
-        DataStream<String> generator =
-                env.fromSequence(1, 10_000_000L)
-                        .map(Object::toString)
-                        .returns(String.class)
-                        .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
-
-        Properties sinkProperties = new Properties();
-        sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
-
-        KinesisFirehoseSink<String> kdfSink =
-                KinesisFirehoseSink.<String>builder()
-                        .setElementConverter(elementConverter)
-                        .setDeliveryStreamName("delivery-stream")
-                        .setMaxBatchSize(20)
-                        .setFirehoseClientProperties(sinkProperties)
-                        .build();
-
-        generator.sinkTo(kdfSink);
-
-        env.execute("KDF Async Sink Example Program");
-    }
-}