You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/02/01 15:35:51 UTC
[flink] 03/03: [FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9050fd56591cbcb3270dd40ea370ea605bda8d39
Author: Zichen Liu <zi...@amazon.com>
AuthorDate: Mon Jan 24 18:55:17 2022 +0000
[FLINK-24228][connectors/firehose] Allows end users to supply a serialization schema rather than an ElementConverter, thereby encapsulating the Firehose `Record` from the user, verifying stream objects in KinesisFirehoseITCase
---
.../docs/connectors/datastream/firehose.md | 56 +++++++---------
.../content/docs/connectors/datastream/firehose.md | 56 +++++++---------
.../aws/testutils/AWSServicesTestUtils.java | 44 +++++++++++++
.../firehose/sink/KinesisFirehoseSinkBuilder.java | 27 ++++++--
.../sink/KinesisFirehoseSinkElementConverter.java | 8 +--
.../firehose/sink/KinesisFirehoseSinkWriter.java | 9 +--
.../sink/KinesisFirehoseSinkBuilderTest.java | 16 ++---
.../KinesisFirehoseSinkElementConverterTest.java | 2 +-
.../firehose/sink/KinesisFirehoseSinkITCase.java | 31 +++++----
.../firehose/sink/examples/SinkIntoFirehose.java | 74 ----------------------
10 files changed, 143 insertions(+), 180 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md b/docs/content.zh/docs/connectors/datastream/firehose.md
index ecc77ce..e958a7c 100644
--- a/docs/content.zh/docs/connectors/datastream/firehose.md
+++ b/docs/content.zh/docs/connectors/datastream/firehose.md
@@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com
{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
{{< tab "Java" >}}
```java
-KinesisFirehoseSinkElementConverter<String> elementConverter =
- KinesisFirehoseSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .build();
-
Properties sinkProperties = new Properties();
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
@@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
KinesisFirehoseSink<String> kdfSink =
KinesisFirehoseSink.<String>builder()
- .setFirehoseClientProperties(sinkProperties) // Required
- .setElementConverter(elementConverter) // Required
- .setDeliveryStreamName("your-stream-name") // Required
- .setFailOnError(false) // Optional
- .setMaxBatchSize(500) // Optional
- .setMaxInFlightRequests(50) // Optional
- .setMaxBufferedRequests(10_000) // Optional
- .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
- .setMaxTimeInBufferMS(5000) // Optional
- .setMaxRecordSizeInBytes(1000 * 1024) // Optional
+ .setFirehoseClientProperties(sinkProperties) // Required
+ .setSerializationSchema(new SimpleStringSchema()) // Required
+ .setDeliveryStreamName("your-stream-name") // Required
+ .setFailOnError(false) // Optional
+ .setMaxBatchSize(500) // Optional
+ .setMaxInFlightRequests(50) // Optional
+ .setMaxBufferedRequests(10_000) // Optional
+ .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
+ .setMaxTimeInBufferMS(5000) // Optional
+ .setMaxRecordSizeInBytes(1000 * 1024) // Optional
.build();
flinkStream.sinkTo(kdfSink);
@@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink);
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val elementConverter =
- KinesisFirehoseSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .build()
-
Properties sinkProperties = new Properties()
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
@@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
val kdfSink =
KinesisFirehoseSink.<String>builder()
- .setFirehoseClientProperties(sinkProperties) // Required
- .setElementConverter(elementConverter) // Required
- .setDeliveryStreamName("your-stream-name") // Required
- .setFailOnError(false) // Optional
- .setMaxBatchSize(500) // Optional
- .setMaxInFlightRequests(50) // Optional
- .setMaxBufferedRequests(10_000) // Optional
- .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
- .setMaxTimeInBufferMS(5000) // Optional
- .setMaxRecordSizeInBytes(1000 * 1024) // Optional
+ .setFirehoseClientProperties(sinkProperties) // Required
+ .setSerializationSchema(new SimpleStringSchema()) // Required
+ .setDeliveryStreamName("your-stream-name") // Required
+ .setFailOnError(false) // Optional
+ .setMaxBatchSize(500) // Optional
+ .setMaxInFlightRequests(50) // Optional
+ .setMaxBufferedRequests(10_000) // Optional
+ .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
+ .setMaxTimeInBufferMS(5000) // Optional
+ .setMaxRecordSizeInBytes(1000 * 1024) // Optional
.build()
flinkStream.sinkTo(kdfSink)
@@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink)
## Configurations
-Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
1. __setFirehoseClientProperties(Properties sinkProperties)__
* Required.
* Supplies credentials, region and other parameters to the Firehose client.
-2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
* Required.
- * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+ * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
3. __setDeliveryStreamName(String deliveryStreamName)__
* Required.
* Name of the delivery stream to sink to.
diff --git a/docs/content/docs/connectors/datastream/firehose.md b/docs/content/docs/connectors/datastream/firehose.md
index b224665..20beb61 100644
--- a/docs/content/docs/connectors/datastream/firehose.md
+++ b/docs/content/docs/connectors/datastream/firehose.md
@@ -38,11 +38,6 @@ The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com
{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
{{< tab "Java" >}}
```java
-KinesisFirehoseSinkElementConverter<String> elementConverter =
- KinesisFirehoseSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .build();
-
Properties sinkProperties = new Properties();
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
@@ -52,16 +47,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
KinesisFirehoseSink<String> kdfSink =
KinesisFirehoseSink.<String>builder()
- .setFirehoseClientProperties(sinkProperties) // Required
- .setElementConverter(elementConverter) // Required
- .setDeliveryStreamName("your-stream-name") // Required
- .setFailOnError(false) // Optional
- .setMaxBatchSize(500) // Optional
- .setMaxInFlightRequests(50) // Optional
- .setMaxBufferedRequests(10_000) // Optional
- .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
- .setMaxTimeInBufferMS(5000) // Optional
- .setMaxRecordSizeInBytes(1000 * 1024) // Optional
+ .setFirehoseClientProperties(sinkProperties) // Required
+ .setSerializationSchema(new SimpleStringSchema()) // Required
+ .setDeliveryStreamName("your-stream-name") // Required
+ .setFailOnError(false) // Optional
+ .setMaxBatchSize(500) // Optional
+ .setMaxInFlightRequests(50) // Optional
+ .setMaxBufferedRequests(10_000) // Optional
+ .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
+ .setMaxTimeInBufferMS(5000) // Optional
+ .setMaxRecordSizeInBytes(1000 * 1024) // Optional
.build();
flinkStream.sinkTo(kdfSink);
@@ -69,11 +64,6 @@ flinkStream.sinkTo(kdfSink);
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val elementConverter =
- KinesisFirehoseSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .build()
-
Properties sinkProperties = new Properties()
// Required
sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
@@ -83,16 +73,16 @@ sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_
val kdfSink =
KinesisFirehoseSink.<String>builder()
- .setFirehoseClientProperties(sinkProperties) // Required
- .setElementConverter(elementConverter) // Required
- .setDeliveryStreamName("your-stream-name") // Required
- .setFailOnError(false) // Optional
- .setMaxBatchSize(500) // Optional
- .setMaxInFlightRequests(50) // Optional
- .setMaxBufferedRequests(10_000) // Optional
- .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
- .setMaxTimeInBufferMS(5000) // Optional
- .setMaxRecordSizeInBytes(1000 * 1024) // Optional
+ .setFirehoseClientProperties(sinkProperties) // Required
+ .setSerializationSchema(new SimpleStringSchema()) // Required
+ .setDeliveryStreamName("your-stream-name") // Required
+ .setFailOnError(false) // Optional
+ .setMaxBatchSize(500) // Optional
+ .setMaxInFlightRequests(50) // Optional
+ .setMaxBufferedRequests(10_000) // Optional
+ .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional
+ .setMaxTimeInBufferMS(5000) // Optional
+ .setMaxRecordSizeInBytes(1000 * 1024) // Optional
.build()
flinkStream.sinkTo(kdfSink)
@@ -102,14 +92,14 @@ flinkStream.sinkTo(kdfSink)
## Configurations
-Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<String>builder()`.
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
1. __setFirehoseClientProperties(Properties sinkProperties)__
* Required.
* Supplies credentials, region and other parameters to the Firehose client.
-2. __setElementConverter(KinesisFirehoseSinkElementConverter elementConverter)__
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
* Required.
- * Supplies a serialization schema to the output. May be built using the following builder `KinesisFirehoseSinkElementConverter.<String>builder()` as per the example.
+ * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
3. __setDeliveryStreamName(String deliveryStreamName)__
* Required.
* Name of the delivery stream to sink to.
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
index 543d81b..ff915fa 100644
--- a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
@@ -31,6 +33,8 @@ import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
@@ -44,6 +48,8 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
@@ -126,4 +132,42 @@ public class AWSServicesTestUtils {
CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects);
return res.get().contents();
}
+
+ public static <T> List<T> readObjectsFromS3Bucket(
+ S3AsyncClient s3AsyncClient,
+ List<S3Object> objects,
+ String bucketName,
+ Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+ S3BucketReader bucketReader = new S3BucketReader(s3AsyncClient, bucketName);
+ return bucketReader.readObjects(objects, deserializer);
+ }
+
+ /** Helper class to read objects from S3. */
+ private static class S3BucketReader {
+ private final S3AsyncClient s3AsyncClient;
+ private final String bucketName;
+
+ public S3BucketReader(S3AsyncClient s3AsyncClient, String bucketName) {
+ this.s3AsyncClient = s3AsyncClient;
+ this.bucketName = bucketName;
+ }
+
+ public <T> List<T> readObjects(
+ List<S3Object> objectList,
+ Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+ return objectList.stream()
+ .map(object -> readObjectWitKey(object.key(), deserializer))
+ .collect(Collectors.toList());
+ }
+
+ public <T> T readObjectWitKey(
+ String key, Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+ GetObjectRequest getObjectRequest =
+ GetObjectRequest.builder().bucket(bucketName).key(key).build();
+ return s3AsyncClient
+ .getObject(getObjectRequest, AsyncResponseTransformer.toBytes())
+ .thenApply(deserializer)
+ .join();
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
index ee22e1f..a180abc 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.firehose.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import software.amazon.awssdk.http.Protocol;
@@ -36,11 +37,6 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1;
* writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name.
*
* <pre>{@code
- * private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
- * KinesisFirehoseSinkElementConverter.<String>builder()
- * .setSerializationSchema(new SimpleStringSchema())
- * .build();
- *
* Properties sinkProperties = new Properties();
* sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
*
@@ -50,6 +46,7 @@ import static software.amazon.awssdk.http.Protocol.HTTP1_1;
* .setDeliveryStreamName("delivery-stream-name")
* .setMaxBatchSize(20)
* .setFirehoseClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
* .build();
* }</pre>
*
@@ -73,7 +70,7 @@ public class KinesisFirehoseSinkBuilder<InputT>
private static final int DEFAULT_MAX_BATCH_SIZE = 500;
private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
- private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+ private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024;
private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024;
@@ -83,6 +80,7 @@ public class KinesisFirehoseSinkBuilder<InputT>
private Boolean failOnError;
private String deliveryStreamName;
private Properties firehoseClientProperties;
+ private SerializationSchema<InputT> serializationSchema;
KinesisFirehoseSinkBuilder() {}
@@ -100,6 +98,19 @@ public class KinesisFirehoseSinkBuilder<InputT>
}
/**
+ * Allows the user to specify a serialization schema to serialize each record to persist to
+ * Firehose.
+ *
+ * @param serializationSchema serialization schema to use
+ * @return {@link KinesisFirehoseSinkBuilder} itself
+ */
+ public KinesisFirehoseSinkBuilder<InputT> setSerializationSchema(
+ SerializationSchema<InputT> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ /**
* If writing to Kinesis Data Firehose results in a partial or full failure being returned, the
* job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set.
*
@@ -134,7 +145,9 @@ public class KinesisFirehoseSinkBuilder<InputT>
@Override
public KinesisFirehoseSink<InputT> build() {
return new KinesisFirehoseSink<>(
- getElementConverter(),
+ KinesisFirehoseSinkElementConverter.<InputT>builder()
+ .setSerializationSchema(serializationSchema)
+ .build(),
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
Optional.ofNullable(getMaxInFlightRequests())
.orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index 45b4186..cca749c 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -17,7 +17,7 @@
package org.apache.flink.connector.firehose.sink;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -31,7 +31,7 @@ import software.amazon.awssdk.services.firehose.model.Record;
* needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a
* {@link Record} that may be persisted.
*/
-@PublicEvolving
+@Internal
public class KinesisFirehoseSinkElementConverter<InputT>
implements ElementConverter<InputT, Record> {
@@ -54,7 +54,6 @@ public class KinesisFirehoseSinkElementConverter<InputT>
}
/** A builder for the KinesisFirehoseSinkElementConverter. */
- @PublicEvolving
public static class Builder<InputT> {
private SerializationSchema<InputT> serializationSchema;
@@ -68,8 +67,7 @@ public class KinesisFirehoseSinkElementConverter<InputT>
public KinesisFirehoseSinkElementConverter<InputT> build() {
Preconditions.checkNotNull(
serializationSchema,
- "No SerializationSchema was supplied to the "
- + "KinesisFirehoseSinkElementConverter builder.");
+ "No SerializationSchema was supplied to the " + "KinesisFirehoseSink builder.");
return new KinesisFirehoseSinkElementConverter<>(serializationSchema);
}
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 4002ec3..b41103f 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -37,7 +37,6 @@ import software.amazon.awssdk.services.firehose.model.Record;
import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -116,7 +115,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
@Override
protected void submitRequestEntries(
- List<Record> requestEntries, Consumer<Collection<Record>> requestResult) {
+ List<Record> requestEntries, Consumer<List<Record>> requestResult) {
PutRecordBatchRequest batchRequest =
PutRecordBatchRequest.builder()
@@ -146,9 +145,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
}
private void handleFullyFailedRequest(
- Throwable err,
- List<Record> requestEntries,
- Consumer<Collection<Record>> requestResult) {
+ Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
LOG.warn(
"KDF Sink failed to persist {} entries to KDF first request was {}",
requestEntries.size(),
@@ -164,7 +161,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
private void handlePartiallyFailedRequest(
PutRecordBatchResponse response,
List<Record> requestEntries,
- Consumer<Collection<Record>> requestResult) {
+ Consumer<List<Record>> requestResult) {
LOG.warn(
"KDF Sink failed to persist {} entries to KDF first request was {}",
requestEntries.size(),
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
index 0dc85da..88f4329 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -17,19 +17,17 @@
package org.apache.flink.connector.firehose.sink;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.assertj.core.api.Assertions;
import org.junit.Test;
-import software.amazon.awssdk.services.firehose.model.Record;
/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
public class KinesisFirehoseSinkBuilderTest {
- private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER =
- KinesisFirehoseSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .build();
+
+ private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
+ new SimpleStringSchema();
@Test
public void elementConverterOfSinkMustBeSetWhenBuilt() {
@@ -40,7 +38,7 @@ public class KinesisFirehoseSinkBuilderTest {
.setDeliveryStreamName("deliveryStream")
.build())
.withMessageContaining(
- "ElementConverter must be not null when initializing the AsyncSinkBase.");
+ "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
}
@Test
@@ -49,7 +47,7 @@ public class KinesisFirehoseSinkBuilderTest {
.isThrownBy(
() ->
KinesisFirehoseSink.<String>builder()
- .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+ .setSerializationSchema(SERIALIZATION_SCHEMA)
.build())
.withMessageContaining(
"The delivery stream name must not be null when initializing the KDF Sink.");
@@ -62,7 +60,7 @@ public class KinesisFirehoseSinkBuilderTest {
() ->
KinesisFirehoseSink.<String>builder()
.setDeliveryStreamName("")
- .setElementConverter(ELEMENT_CONVERTER_PLACEHOLDER)
+ .setSerializationSchema(SERIALIZATION_SCHEMA)
.build())
.withMessageContaining(
"The delivery stream name must be set when initializing the KDF Sink.");
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
index ed0b1c7..221f444 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -35,7 +35,7 @@ public class KinesisFirehoseSinkElementConverterTest {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build())
.withMessageContaining(
- "No SerializationSchema was supplied to the KinesisFirehoseSinkElementConverter builder.");
+ "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
}
@Test
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
index 08cb49b..8809437 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -19,7 +19,6 @@ package org.apache.flink.connector.firehose.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.DockerImageVersions;
@@ -35,12 +34,12 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
-import software.amazon.awssdk.services.firehose.model.Record;
import software.amazon.awssdk.services.iam.IamAsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.utils.ImmutableMap;
+import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
@@ -49,6 +48,7 @@ import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getC
import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getIamClient;
import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.getS3Client;
import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket;
import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.getFirehoseClient;
import static org.assertj.core.api.Assertions.assertThat;
@@ -56,22 +56,17 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
public class KinesisFirehoseSinkITCase {
- private static final ElementConverter<String, Record> elementConverter =
- KinesisFirehoseSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .build();
-
private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
- private S3AsyncClient s3AsyncClient;
- private FirehoseAsyncClient firehoseAsyncClient;
- private IamAsyncClient iamAsyncClient;
-
private static final String ROLE_NAME = "super-role";
private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME;
private static final String BUCKET_NAME = "s3-firehose";
private static final String STREAM_NAME = "s3-stream";
private static final int NUMBER_OF_ELEMENTS = 92;
+ private S3AsyncClient s3AsyncClient;
+ private FirehoseAsyncClient firehoseAsyncClient;
+ private IamAsyncClient iamAsyncClient;
+
@ClassRule
public static LocalstackContainer mockFirehoseContainer =
new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
@@ -106,10 +101,15 @@ public class KinesisFirehoseSinkITCase {
.map(Object::toString)
.returns(String.class)
.map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+ List<String> expectedElements = new ArrayList<>();
+ for (int i = 1; i < NUMBER_OF_ELEMENTS; i++) {
+ expectedElements.add(
+ mapper.writeValueAsString(ImmutableMap.of("data", String.valueOf(i))));
+ }
KinesisFirehoseSink<String> kdsSink =
KinesisFirehoseSink.<String>builder()
- .setElementConverter(elementConverter)
+ .setSerializationSchema(new SimpleStringSchema())
.setDeliveryStreamName(STREAM_NAME)
.setMaxBatchSize(1)
.setFirehoseClientProperties(getConfig(mockFirehoseContainer.getEndpoint()))
@@ -120,5 +120,12 @@ public class KinesisFirehoseSinkITCase {
List<S3Object> objects = listBucketObjects(s3AsyncClient, BUCKET_NAME);
assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
+ assertThat(
+ readObjectsFromS3Bucket(
+ s3AsyncClient,
+ objects,
+ BUCKET_NAME,
+ response -> new String(response.asByteArrayUnsafe())))
+ .containsAll(expectedElements);
}
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
deleted file mode 100644
index 3e9d0ee..0000000
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/examples/SinkIntoFirehose.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.firehose.sink.examples;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
-import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkElementConverter;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-import java.util.Properties;
-
-/**
- * An example application demonstrating how to use the {@link KinesisFirehoseSink} to sink into KDF.
- *
- * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
- * SDK 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY}
- * through environment variables etc.
- */
-public class SinkIntoFirehose {
-
- private static final KinesisFirehoseSinkElementConverter<String> elementConverter =
- KinesisFirehoseSinkElementConverter.<String>builder()
- .setSerializationSchema(new SimpleStringSchema())
- .build();
-
- public static void main(String[] args) throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(10_000);
-
- DataStream<String> generator =
- env.fromSequence(1, 10_000_000L)
- .map(Object::toString)
- .returns(String.class)
- .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
-
- Properties sinkProperties = new Properties();
- sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
-
- KinesisFirehoseSink<String> kdfSink =
- KinesisFirehoseSink.<String>builder()
- .setElementConverter(elementConverter)
- .setDeliveryStreamName("delivery-stream")
- .setMaxBatchSize(20)
- .setFirehoseClientProperties(sinkProperties)
- .build();
-
- generator.sinkTo(kdfSink);
-
- env.execute("KDF Async Sink Example Program");
- }
-}