You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/03 18:59:55 UTC
[flink-connector-aws] 03/08: [FLINK-29907][Connectors/Firehose] Externalize Amazon Firehose connectors from Flink repo
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit 5993e34901152396a937cee399ced3beef602f0f
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Fri Dec 2 09:31:35 2022 +0000
[FLINK-29907][Connectors/Firehose] Externalize Amazon Firehose connectors from Flink repo
---
.../54da9a7d-14d2-4632-a045-1dd8fc665c8f | 0
.../a6cbd99c-b115-447a-8f19-43c1094db549 | 6 +
.../archunit-violations/stored.rules | 4 +
flink-connector-aws-kinesis-firehose/pom.xml | 153 ++++++++++++
.../sink/KinesisFirehoseConfigConstants.java | 32 +++
.../firehose/sink/KinesisFirehoseException.java | 54 +++++
.../firehose/sink/KinesisFirehoseSink.java | 135 +++++++++++
.../firehose/sink/KinesisFirehoseSinkBuilder.java | 164 +++++++++++++
.../sink/KinesisFirehoseSinkElementConverter.java | 104 ++++++++
.../firehose/sink/KinesisFirehoseSinkWriter.java | 264 +++++++++++++++++++++
.../sink/KinesisFirehoseStateSerializer.java | 52 ++++
.../table/KinesisFirehoseConnectorOptions.java | 43 ++++
.../firehose/table/KinesisFirehoseDynamicSink.java | 183 ++++++++++++++
.../table/KinesisFirehoseDynamicTableFactory.java | 89 +++++++
.../util/KinesisFirehoseConnectorOptionUtils.java | 67 ++++++
.../org.apache.flink.table.factories.Factory | 16 ++
.../src/main/resources/log4j2.properties | 25 ++
.../architecture/TestCodeArchitectureTest.java | 40 ++++
.../sink/KinesisFirehoseSinkBuilderTest.java | 81 +++++++
.../KinesisFirehoseSinkElementConverterTest.java | 54 +++++
.../firehose/sink/KinesisFirehoseSinkITCase.java | 125 ++++++++++
.../firehose/sink/KinesisFirehoseSinkTest.java | 132 +++++++++++
.../sink/KinesisFirehoseSinkWriterTest.java | 106 +++++++++
.../sink/KinesisFirehoseStateSerializerTest.java | 56 +++++
.../sink/testutils/KinesisFirehoseTestUtils.java | 86 +++++++
.../KinesisFirehoseDynamicTableFactoryTest.java | 157 ++++++++++++
.../org.junit.jupiter.api.extension.Extension | 16 ++
.../src/test/resources/archunit.properties | 31 +++
.../src/test/resources/log4j2-test.properties | 28 +++
flink-sql-connector-aws-kinesis-firehose/pom.xml | 107 +++++++++
.../src/main/resources/META-INF/NOTICE | 48 ++++
pom.xml | 2 +
32 files changed, 2460 insertions(+)
diff --git a/flink-connector-aws-kinesis-firehose/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f b/flink-connector-aws-kinesis-firehose/archunit-violations/54da9a7d-14d2-4632-a045-1dd8fc665c8f
new file mode 100644
index 0000000..e69de29
diff --git a/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549 b/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
new file mode 100644
index 0000000..5ad7b14
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
@@ -0,0 +1,6 @@
+org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connector-aws-kinesis-firehose/archunit-violations/stored.rules b/flink-connector-aws-kinesis-firehose/archunit-violations/stored.rules
new file mode 100644
index 0000000..cf8b667
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Tue Feb 22 12:19:26 CET 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=54da9a7d-14d2-4632-a045-1dd8fc665c8f
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=a6cbd99c-b115-447a-8f19-43c1094db549
diff --git a/flink-connector-aws-kinesis-firehose/pom.xml b/flink-connector-aws-kinesis-firehose/pom.xml
new file mode 100644
index 0000000..8fa9179
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/pom.xml
@@ -0,0 +1,153 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-parent</artifactId>
+ <version>4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
+ <name>Flink : Connectors : AWS : Amazon Kinesis Data Firehose</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>firehose</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>netty-nio-client</artifactId>
+ </dependency>
+
+ <!--Table Api Dependencies-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-base</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>iam</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- ArchUit test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-architecture-tests-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
new file mode 100644
index 0000000..527f74a
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseConfigConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Defaults for {@link KinesisFirehoseSinkWriter}. */
+@PublicEvolving
+public class KinesisFirehoseConfigConstants {
+
+ public static final String BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT =
+ "Apache Flink %s (%s) Firehose Connector";
+
+ /** Firehose identifier for user agent prefix. */
+ public static final String FIREHOSE_CLIENT_USER_AGENT_PREFIX =
+ "aws.firehose.client.user-agent-prefix";
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
new file mode 100644
index 0000000..e76c10b
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@link RuntimeException} wrapper indicating the exception was thrown from the Kinesis Data
+ * Firehose Sink.
+ */
+@PublicEvolving
+class KinesisFirehoseException extends RuntimeException {
+
+ public KinesisFirehoseException(final String message) {
+ super(message);
+ }
+
+ public KinesisFirehoseException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * When the flag {@code failOnError} is set in {@link KinesisFirehoseSinkWriter}, this exception
+ * is raised as soon as any exception occurs when writing to KDF.
+ */
+ static class KinesisFirehoseFailFastException extends KinesisFirehoseException {
+
+ private static final String ERROR_MESSAGE =
+ "Encountered an exception while persisting records, not retrying due to {failOnError} being set.";
+
+ public KinesisFirehoseFailFastException() {
+ super(ERROR_MESSAGE);
+ }
+
+ public KinesisFirehoseFailFastException(final Throwable cause) {
+ super(ERROR_MESSAGE, cause);
+ }
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
new file mode 100644
index 0000000..6f9ed54
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+/**
+ * A Kinesis Data Firehose (KDF) Sink that performs async requests against a destination delivery
+ * stream using the buffering protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.firehose.FirehoseAsyncClient} to communicate with the AWS
+ * endpoint.
+ *
+ * <p>Please see the writer implementation in {@link KinesisFirehoseSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
+
+ private final boolean failOnError;
+ private final String deliveryStreamName;
+ private final Properties firehoseClientProperties;
+
+ KinesisFirehoseSink(
+ ElementConverter<InputT, Record> elementConverter,
+ Integer maxBatchSize,
+ Integer maxInFlightRequests,
+ Integer maxBufferedRequests,
+ Long maxBatchSizeInBytes,
+ Long maxTimeInBufferMS,
+ Long maxRecordSizeInBytes,
+ boolean failOnError,
+ String deliveryStreamName,
+ Properties firehoseClientProperties) {
+ super(
+ elementConverter,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInBytes);
+ this.deliveryStreamName =
+ Preconditions.checkNotNull(
+ deliveryStreamName,
+ "The delivery stream name must not be null when initializing the KDF Sink.");
+ Preconditions.checkArgument(
+ !this.deliveryStreamName.isEmpty(),
+ "The delivery stream name must be set when initializing the KDF Sink.");
+ this.failOnError = failOnError;
+ this.firehoseClientProperties = firehoseClientProperties;
+ }
+
+ /**
+ * Create a {@link KinesisFirehoseSinkBuilder} to allow the fluent construction of a new {@code
+ * KinesisFirehoseSink}.
+ *
+ * @param <InputT> type of incoming records
+ * @return {@link KinesisFirehoseSinkBuilder}
+ */
+ public static <InputT> KinesisFirehoseSinkBuilder<InputT> builder() {
+ return new KinesisFirehoseSinkBuilder<>();
+ }
+
+ @Override
+ public StatefulSinkWriter<InputT, BufferedRequestState<Record>> createWriter(
+ InitContext context) throws IOException {
+ return new KinesisFirehoseSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ failOnError,
+ deliveryStreamName,
+ firehoseClientProperties,
+ Collections.emptyList());
+ }
+
+ @Override
+ public StatefulSinkWriter<InputT, BufferedRequestState<Record>> restoreWriter(
+ InitContext context, Collection<BufferedRequestState<Record>> recoveredState)
+ throws IOException {
+ return new KinesisFirehoseSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ failOnError,
+ deliveryStreamName,
+ firehoseClientProperties,
+ recoveredState);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<BufferedRequestState<Record>> getWriterStateSerializer() {
+ return new KinesisFirehoseStateSerializer();
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
new file mode 100644
index 0000000..087fd6b
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link KinesisFirehoseSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link KinesisFirehoseSink} that
+ * writes String values to a Kinesis Data Firehose delivery stream named delivery-stream-name.
+ *
+ * <pre>{@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * KinesisFirehoseSink<String> kdfSink =
+ * KinesisFirehoseSink.<String>builder()
+ * .setElementConverter(elementConverter)
+ * .setDeliveryStreamName("delivery-stream-name")
+ * .setMaxBatchSize(20)
+ * .setFirehoseClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following defaults will be used:
+ *
+ * <ul>
+ * <li>{@code maxBatchSize} will be 500
+ * <li>{@code maxInFlightRequests} will be 50
+ * <li>{@code maxBufferedRequests} will be 10000
+ * <li>{@code maxBatchSizeInBytes} will be 4 MB i.e. {@code 4 * 1024 * 1024}
+ * <li>{@code maxTimeInBufferMS} will be 5000ms
+ * <li>{@code maxRecordSizeInBytes} will be 1000 KB i.e. {@code 1000 * 1024}
+ * <li>{@code failOnError} will be false
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class KinesisFirehoseSinkBuilder<InputT>
+ extends AsyncSinkBaseBuilder<InputT, Record, KinesisFirehoseSinkBuilder<InputT>> {
+
+ private static final int DEFAULT_MAX_BATCH_SIZE = 500;
+ private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+ private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
+ private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 4 * 1024 * 1024;
+ private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+ private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000 * 1024;
+ private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+ private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+ private Boolean failOnError;
+ private String deliveryStreamName;
+ private Properties firehoseClientProperties;
+ private SerializationSchema<InputT> serializationSchema;
+
+ KinesisFirehoseSinkBuilder() {}
+
+ /**
+ * Sets the name of the KDF delivery stream that the sink will connect to. There is no default
+ * for this parameter, therefore, this must be provided at sink creation time otherwise the
+ * build will fail.
+ *
+ * @param deliveryStreamName the name of the delivery stream
+ * @return {@link KinesisFirehoseSinkBuilder} itself
+ */
+ public KinesisFirehoseSinkBuilder<InputT> setDeliveryStreamName(String deliveryStreamName) {
+ this.deliveryStreamName = deliveryStreamName;
+ return this;
+ }
+
+ /**
+ * Allows the user to specify a serialization schema to serialize each record to persist to
+ * Firehose.
+ *
+ * @param serializationSchema serialization schema to use
+ * @return {@link KinesisFirehoseSinkBuilder} itself
+ */
+ public KinesisFirehoseSinkBuilder<InputT> setSerializationSchema(
+ SerializationSchema<InputT> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ /**
+ * If writing to Kinesis Data Firehose results in a partial or full failure being returned, the
+ * job will fail immediately with a {@link KinesisFirehoseException} if failOnError is set.
+ *
+ * @param failOnError whether to fail on error
+ * @return {@link KinesisFirehoseSinkBuilder} itself
+ */
+ public KinesisFirehoseSinkBuilder<InputT> setFailOnError(boolean failOnError) {
+ this.failOnError = failOnError;
+ return this;
+ }
+
+ /**
+ * A set of properties used by the sink to create the firehose client. This may be used to set
+ * the aws region, credentials etc. See the docs for usage and syntax.
+ *
+ * @param firehoseClientProperties Firehose client properties
+ * @return {@link KinesisFirehoseSinkBuilder} itself
+ */
+ public KinesisFirehoseSinkBuilder<InputT> setFirehoseClientProperties(
+ Properties firehoseClientProperties) {
+ this.firehoseClientProperties = firehoseClientProperties;
+ return this;
+ }
+
+ @VisibleForTesting
+ Properties getClientPropertiesWithDefaultHttpProtocol() {
+ Properties clientProperties =
+ Optional.ofNullable(firehoseClientProperties).orElse(new Properties());
+ clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL.toString());
+ return clientProperties;
+ }
+
+ @Override
+ public KinesisFirehoseSink<InputT> build() {
+ return new KinesisFirehoseSink<>(
+ KinesisFirehoseSinkElementConverter.<InputT>builder()
+ .setSerializationSchema(serializationSchema)
+ .build(),
+ Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+ Optional.ofNullable(getMaxInFlightRequests())
+ .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+ Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+ Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
+ Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+ Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
+ Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
+ deliveryStreamName,
+ getClientPropertiesWithDefaultHttpProtocol());
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
new file mode 100644
index 0000000..b90db33
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS Kinesis SDK v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a
+ * {@link Record} that may be persisted.
+ */
+@Internal
+public class KinesisFirehoseSinkElementConverter<InputT>
+ implements ElementConverter<InputT, Record> {
+ private boolean schemaOpened = false;
+
+ /** A serialization schema to specify how the input element should be serialized. */
+ private final SerializationSchema<InputT> serializationSchema;
+
+ private KinesisFirehoseSinkElementConverter(SerializationSchema<InputT> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public Record apply(InputT element, SinkWriter.Context context) {
+ checkOpened();
+ return Record.builder()
+ .data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
+ .build();
+ }
+
+ private void checkOpened() {
+ if (!schemaOpened) {
+ try {
+ serializationSchema.open(
+ new SerializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(
+ KinesisFirehoseSinkElementConverter.class.getClassLoader());
+ }
+ });
+ schemaOpened = true;
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
+ }
+ }
+ }
+
+ public static <InputT> Builder<InputT> builder() {
+ return new Builder<>();
+ }
+
+ /** A builder for the KinesisFirehoseSinkElementConverter. */
+ public static class Builder<InputT> {
+
+ private SerializationSchema<InputT> serializationSchema;
+
+ public Builder<InputT> setSerializationSchema(
+ SerializationSchema<InputT> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return this;
+ }
+
+ public KinesisFirehoseSinkElementConverter<InputT> build() {
+ Preconditions.checkNotNull(
+ serializationSchema,
+ "No SerializationSchema was supplied to the " + "KinesisFirehoseSink builder.");
+ return new KinesisFirehoseSinkElementConverter<>(serializationSchema);
+ }
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
new file mode 100644
index 0000000..c16018d
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
+import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
+import software.amazon.awssdk.services.firehose.model.Record;
+import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link KinesisFirehoseSink} to write to Kinesis Data Firehose. More
+ * details on the operation of this sink writer may be found in the doc for {@link
+ * KinesisFirehoseSink}. More details on the internals of this sink writer may be found in {@link
+ * AsyncSinkWriter}.
+ *
+ * <p>The {@link FirehoseAsyncClient} used here may be configured in the standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkWriter.class);
+
+ private static SdkAsyncHttpClient createHttpClient(Properties firehoseClientProperties) {
+ return AWSGeneralUtil.createAsyncHttpClient(firehoseClientProperties);
+ }
+
+ private static FirehoseAsyncClient createFirehoseClient(
+ Properties firehoseClientProperties, SdkAsyncHttpClient httpClient) {
+ AWSGeneralUtil.validateAwsCredentials(firehoseClientProperties);
+ return AWSAsyncSinkUtil.createAwsAsyncClient(
+ firehoseClientProperties,
+ httpClient,
+ FirehoseAsyncClient.builder(),
+ KinesisFirehoseConfigConstants.BASE_FIREHOSE_USER_AGENT_PREFIX_FORMAT,
+ KinesisFirehoseConfigConstants.FIREHOSE_CLIENT_USER_AGENT_PREFIX);
+ }
+
+ private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER =
+ FatalExceptionClassifier.withRootCauseOfType(
+ ResourceNotFoundException.class,
+ err ->
+ new KinesisFirehoseException(
+ "Encountered non-recoverable exception relating to not being able to find the specified resources",
+ err));
+
+ private static final FatalExceptionClassifier FIREHOSE_FATAL_EXCEPTION_CLASSIFIER =
+ FatalExceptionClassifier.createChain(
+ getInterruptedExceptionClassifier(),
+ getInvalidCredentialsExceptionClassifier(),
+ RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
+ getSdkClientMisconfiguredExceptionClassifier());
+
+ private final Counter numRecordsOutErrorsCounter;
+
+ /* Name of the delivery stream in Kinesis Data Firehose */
+ private final String deliveryStreamName;
+
+ /* The sink writer metric group */
+ private final SinkWriterMetricGroup metrics;
+
+ /* The asynchronous http client */
+ private final SdkAsyncHttpClient httpClient;
+
+ /* The asynchronous Firehose client */
+ private final FirehoseAsyncClient firehoseClient;
+
+ /* Flag to whether fatally fail any time we encounter an exception when persisting records */
+ private final boolean failOnError;
+
+ KinesisFirehoseSinkWriter(
+ ElementConverter<InputT, Record> elementConverter,
+ Sink.InitContext context,
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxBatchSizeInBytes,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes,
+ boolean failOnError,
+ String deliveryStreamName,
+ Properties firehoseClientProperties) {
+ this(
+ elementConverter,
+ context,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInBytes,
+ failOnError,
+ deliveryStreamName,
+ firehoseClientProperties,
+ Collections.emptyList());
+ }
+
+ KinesisFirehoseSinkWriter(
+ ElementConverter<InputT, Record> elementConverter,
+ Sink.InitContext context,
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxBatchSizeInBytes,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes,
+ boolean failOnError,
+ String deliveryStreamName,
+ Properties firehoseClientProperties,
+ Collection<BufferedRequestState<Record>> initialStates) {
+ super(
+ elementConverter,
+ context,
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(maxBatchSize)
+ .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+ .setMaxInFlightRequests(maxInFlightRequests)
+ .setMaxBufferedRequests(maxBufferedRequests)
+ .setMaxTimeInBufferMS(maxTimeInBufferMS)
+ .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+ .build(),
+ initialStates);
+ this.failOnError = failOnError;
+ this.deliveryStreamName = deliveryStreamName;
+ this.metrics = context.metricGroup();
+ this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
+ this.httpClient = createHttpClient(firehoseClientProperties);
+ this.firehoseClient = createFirehoseClient(firehoseClientProperties, httpClient);
+ }
+
+ @Override
+ protected void submitRequestEntries(
+ List<Record> requestEntries, Consumer<List<Record>> requestResult) {
+
+ PutRecordBatchRequest batchRequest =
+ PutRecordBatchRequest.builder()
+ .records(requestEntries)
+ .deliveryStreamName(deliveryStreamName)
+ .build();
+
+ CompletableFuture<PutRecordBatchResponse> future =
+ firehoseClient.putRecordBatch(batchRequest);
+
+ future.whenComplete(
+ (response, err) -> {
+ if (err != null) {
+ handleFullyFailedRequest(err, requestEntries, requestResult);
+ } else if (response.failedPutCount() > 0) {
+ handlePartiallyFailedRequest(response, requestEntries, requestResult);
+ } else {
+ requestResult.accept(Collections.emptyList());
+ }
+ });
+ }
+
+ @Override
+ protected long getSizeInBytes(Record requestEntry) {
+ return requestEntry.data().asByteArrayUnsafe().length;
+ }
+
+ @Override
+ public void close() {
+ AWSGeneralUtil.closeResources(httpClient, firehoseClient);
+ }
+
+ private void handleFullyFailedRequest(
+ Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
+ LOG.debug(
+ "KDF Sink failed to write and will retry {} entries to KDF first request was {}",
+ requestEntries.size(),
+ requestEntries.get(0).toString(),
+ err);
+ numRecordsOutErrorsCounter.inc(requestEntries.size());
+
+ if (isRetryable(err)) {
+ requestResult.accept(requestEntries);
+ }
+ }
+
+ private void handlePartiallyFailedRequest(
+ PutRecordBatchResponse response,
+ List<Record> requestEntries,
+ Consumer<List<Record>> requestResult) {
+ LOG.debug(
+ "KDF Sink failed to write and will retry {} entries to KDF first request was {}",
+ requestEntries.size(),
+ requestEntries.get(0).toString());
+ numRecordsOutErrorsCounter.inc(response.failedPutCount());
+
+ if (failOnError) {
+ getFatalExceptionCons()
+ .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException());
+ return;
+ }
+ List<Record> failedRequestEntries = new ArrayList<>(response.failedPutCount());
+ List<PutRecordBatchResponseEntry> records = response.requestResponses();
+
+ for (int i = 0; i < records.size(); i++) {
+ if (records.get(i).errorCode() != null) {
+ failedRequestEntries.add(requestEntries.get(i));
+ }
+ }
+
+ requestResult.accept(failedRequestEntries);
+ }
+
+ private boolean isRetryable(Throwable err) {
+ if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
+ return false;
+ }
+ if (failOnError) {
+ getFatalExceptionCons()
+ .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err));
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java
new file mode 100644
index 0000000..36162e6
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Kinesis Firehose implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class KinesisFirehoseStateSerializer extends AsyncSinkWriterStateSerializer<Record> {
+ @Override
+ protected void serializeRequestToStream(Record request, DataOutputStream out)
+ throws IOException {
+ out.write(request.data().asByteArrayUnsafe());
+ }
+
+ @Override
+ protected Record deserializeRequestFromStream(long requestSize, DataInputStream in)
+ throws IOException {
+ byte[] requestData = new byte[(int) requestSize];
+ in.read(requestData);
+ return Record.builder().data(SdkBytes.fromByteArray(requestData)).build();
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java
new file mode 100644
index 0000000..92773c6
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseConnectorOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
+
+/** Options for the Kinesis firehose connector. */
+@PublicEvolving
+public class KinesisFirehoseConnectorOptions extends AsyncSinkConnectorOptions {
+
+ public static final ConfigOption<String> DELIVERY_STREAM =
+ ConfigOptions.key("delivery-stream")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the Kinesis Firehose delivery stream backing this table.");
+
+ public static final ConfigOption<Boolean> SINK_FAIL_ON_ERROR =
+ ConfigOptions.key("sink.fail-on-error")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional fail on error value for kinesis Firehose sink, default is false");
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
new file mode 100644
index 0000000..149dbaf
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/** Kinesis firehose backed {@link AsyncDynamicTableSink}. */
+@Internal
+public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> {
+
+ /** Consumed data type of the table. */
+ private final DataType consumedDataType;
+
+ /** The Firehose delivery stream to write to. */
+ private final String deliveryStream;
+
+ /** Properties for the Firehose DataStream Sink. */
+ private final Properties firehoseClientProperties;
+
+ /** Sink format for encoding records to Kinesis. */
+ private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
+
+ private final Boolean failOnError;
+
+ protected KinesisFirehoseDynamicSink(
+ @Nullable Integer maxBatchSize,
+ @Nullable Integer maxInFlightRequests,
+ @Nullable Integer maxBufferedRequests,
+ @Nullable Long maxBufferSizeInBytes,
+ @Nullable Long maxTimeInBufferMS,
+ @Nullable Boolean failOnError,
+ @Nullable DataType consumedDataType,
+ String deliveryStream,
+ @Nullable Properties firehoseClientProperties,
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ super(
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBufferSizeInBytes,
+ maxTimeInBufferMS);
+ this.failOnError = failOnError;
+ this.firehoseClientProperties = firehoseClientProperties;
+ this.consumedDataType =
+ Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null");
+ this.deliveryStream =
+ Preconditions.checkNotNull(
+ deliveryStream, "Firehose Delivery stream name must not be null");
+ this.encodingFormat =
+ Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null");
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return encodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ SerializationSchema<RowData> serializationSchema =
+ encodingFormat.createRuntimeEncoder(context, consumedDataType);
+
+ KinesisFirehoseSinkBuilder<RowData> builder =
+ KinesisFirehoseSink.<RowData>builder()
+ .setSerializationSchema(serializationSchema)
+ .setFirehoseClientProperties(firehoseClientProperties)
+ .setDeliveryStreamName(deliveryStream);
+
+ Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
+ super.addAsyncOptionsToSinkBuilder(builder);
+
+ return SinkV2Provider.of(builder.build());
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new KinesisFirehoseDynamicSink(
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBufferSizeInBytes,
+ maxTimeInBufferMS,
+ failOnError,
+ consumedDataType,
+ deliveryStream,
+ firehoseClientProperties,
+ encodingFormat);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "firehose";
+ }
+
+ /** Builder class for {@link KinesisFirehoseDynamicSink}. */
+ @Internal
+ public static class KinesisFirehoseDynamicSinkBuilder
+ extends AsyncDynamicTableSinkBuilder<Record, KinesisFirehoseDynamicSinkBuilder> {
+
+ private DataType consumedDataType = null;
+ private String deliveryStream = null;
+ private Properties firehoseClientProperties = null;
+ private EncodingFormat<SerializationSchema<RowData>> encodingFormat = null;
+ private Boolean failOnError = null;
+
+ public KinesisFirehoseDynamicSinkBuilder setConsumedDataType(DataType consumedDataType) {
+ this.consumedDataType = consumedDataType;
+ return this;
+ }
+
+ public KinesisFirehoseDynamicSinkBuilder setDeliveryStream(String deliveryStream) {
+ this.deliveryStream = deliveryStream;
+ return this;
+ }
+
+ public KinesisFirehoseDynamicSinkBuilder setFirehoseClientProperties(
+ Properties firehoseClientProperties) {
+ this.firehoseClientProperties = firehoseClientProperties;
+ return this;
+ }
+
+ public KinesisFirehoseDynamicSinkBuilder setEncodingFormat(
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ this.encodingFormat = encodingFormat;
+ return this;
+ }
+
+ public KinesisFirehoseDynamicSinkBuilder setFailOnError(Boolean failOnError) {
+ this.failOnError = failOnError;
+ return this;
+ }
+
+ @Override
+ public KinesisFirehoseDynamicSink build() {
+ return new KinesisFirehoseDynamicSink(
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBufferSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ failOnError,
+ consumedDataType,
+ deliveryStream,
+ firehoseClientProperties,
+ encodingFormat);
+ }
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
new file mode 100644
index 0000000..a7ca38e
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
+import org.apache.flink.connector.firehose.table.util.KinesisFirehoseConnectorOptionUtils;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.DELIVERY_STREAM;
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.SINK_FAIL_ON_ERROR;
+import static org.apache.flink.connector.firehose.table.util.KinesisFirehoseConnectorOptionUtils.FIREHOSE_CLIENT_PROPERTIES_KEY;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/** Factory for creating {@link KinesisFirehoseDynamicSink} . */
+@Internal
+public class KinesisFirehoseDynamicTableFactory extends AsyncDynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "firehose";
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+
+ AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);
+
+ KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder builder =
+ new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder();
+
+ KinesisFirehoseConnectorOptionUtils optionsUtils =
+ new KinesisFirehoseConnectorOptionUtils(
+ factoryContext.getResolvedOptions(), factoryContext.getTableOptions());
+ // validate the data types of the table options
+ factoryContext
+ .getFactoryHelper()
+ .validateExcept(optionsUtils.getNonValidatedPrefixes().toArray(new String[0]));
+ Properties properties = optionsUtils.getSinkProperties();
+
+ builder.setDeliveryStream((String) properties.get(DELIVERY_STREAM.key()))
+ .setFirehoseClientProperties(
+ (Properties) properties.get(FIREHOSE_CLIENT_PROPERTIES_KEY))
+ .setEncodingFormat(factoryContext.getEncodingFormat())
+ .setConsumedDataType(factoryContext.getPhysicalDataType());
+ Optional.ofNullable((Boolean) properties.get(SINK_FAIL_ON_ERROR.key()))
+ .ifPresent(builder::setFailOnError);
+ return super.addAsyncOptionsToBuilder(properties, builder).build();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DELIVERY_STREAM);
+ options.add(FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = super.optionalOptions();
+ options.add(SINK_FAIL_ON_ERROR);
+ return options;
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java
new file mode 100644
index 0000000..bd1ccfb
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/util/KinesisFirehoseConnectorOptionUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.table.util.AsyncClientOptionsUtils;
+import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.DELIVERY_STREAM;
+import static org.apache.flink.connector.firehose.table.KinesisFirehoseConnectorOptions.SINK_FAIL_ON_ERROR;
+
+/** Class for extracting firehose configurations from table options. */
+@Internal
+public class KinesisFirehoseConnectorOptionUtils {
+
+ public static final String FIREHOSE_CLIENT_PROPERTIES_KEY = "sink.client.properties";
+
+ private final AsyncSinkConfigurationValidator asyncSinkConfigurationValidator;
+ private final AsyncClientOptionsUtils asyncClientOptionsUtils;
+ private final Map<String, String> resolvedOptions;
+ private final ReadableConfig tableOptions;
+
+ public KinesisFirehoseConnectorOptionUtils(
+ Map<String, String> resolvedOptions, ReadableConfig tableOptions) {
+ this.asyncSinkConfigurationValidator = new AsyncSinkConfigurationValidator(tableOptions);
+ this.asyncClientOptionsUtils = new AsyncClientOptionsUtils(resolvedOptions);
+ this.resolvedOptions = resolvedOptions;
+ this.tableOptions = tableOptions;
+ }
+
+ public List<String> getNonValidatedPrefixes() {
+ return this.asyncClientOptionsUtils.getNonValidatedPrefixes();
+ }
+
+ public Properties getSinkProperties() {
+ Properties properties = asyncSinkConfigurationValidator.getValidatedConfigurations();
+ properties.put(DELIVERY_STREAM.key(), tableOptions.get(DELIVERY_STREAM));
+ Properties kinesisClientProps = asyncClientOptionsUtils.getValidatedConfigurations();
+ properties.put(FIREHOSE_CLIENT_PROPERTIES_KEY, kinesisClientProps);
+ if (tableOptions.getOptional(SINK_FAIL_ON_ERROR).isPresent()) {
+ properties.put(
+ SINK_FAIL_ON_ERROR.key(), tableOptions.getOptional(SINK_FAIL_ON_ERROR).get());
+ }
+ return properties;
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-aws-kinesis-firehose/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..2147c30
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.firehose.table.KinesisFirehoseDynamicTableFactory
diff --git a/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties b/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 0000000..d7fbc74
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+ packages = "org.apache.flink.connector.firehose",
+ importOptions = {
+ ImportOption.OnlyIncludeTests.class,
+ ImportOptions.ExcludeScalaImportOption.class,
+ ImportOptions.ExcludeShadedImportOption.class
+ })
+public class TestCodeArchitectureTest {
+
+ @ArchTest
+ public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
new file mode 100644
index 0000000..e375193
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
+class KinesisFirehoseSinkBuilderTest {
+
+ private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
+ new SimpleStringSchema();
+
+ @Test
+ void elementConverterOfSinkMustBeSetWhenBuilt() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ KinesisFirehoseSink.builder()
+ .setDeliveryStreamName("deliveryStream")
+ .build())
+ .withMessageContaining(
+ "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
+ }
+
+ @Test
+ void streamNameOfSinkMustBeSetWhenBuilt() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ KinesisFirehoseSink.<String>builder()
+ .setSerializationSchema(SERIALIZATION_SCHEMA)
+ .build())
+ .withMessageContaining(
+ "The delivery stream name must not be null when initializing the KDF Sink.");
+ }
+
+ @Test
+ void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ KinesisFirehoseSink.<String>builder()
+ .setDeliveryStreamName("")
+ .setSerializationSchema(SERIALIZATION_SCHEMA)
+ .build())
+ .withMessageContaining(
+ "The delivery stream name must be set when initializing the KDF Sink.");
+ }
+
+ @Test
+ void defaultProtocolVersionInsertedToConfiguration() {
+ Properties expectedProps = new Properties();
+ expectedProps.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
+ Properties defaultProperties =
+ KinesisFirehoseSink.<String>builder().getClientPropertiesWithDefaultHttpProtocol();
+
+ Assertions.assertThat(defaultProperties).isEqualTo(expectedProps);
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
new file mode 100644
index 0000000..ccb4582
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link KinesisFirehoseSinkElementConverter}. */
+class KinesisFirehoseSinkElementConverterTest {
+
+ @Test
+ void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build())
+ .withMessageContaining(
+ "No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
+ }
+
+ @Test
+ void elementConverterUsesProvidedSchemaToSerializeRecord() {
+ ElementConverter<String, Record> elementConverter =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ String testString = "{many hands make light work;";
+
+ Record serializedRecord = elementConverter.apply(testString, null);
+ byte[] serializedString = (new SimpleStringSchema()).serialize(testString);
+ assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString));
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
new file mode 100644
index 0000000..2e4105a
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.util.List;
+
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createBucket;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIAMRole;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createIamClient;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.listBucketObjects;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.readObjectsFromS3Bucket;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createDeliveryStream;
+import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createFirehoseClient;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
+@Testcontainers
+class KinesisFirehoseSinkITCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
+ private static final String ROLE_NAME = "super-role";
+ private static final String ROLE_ARN = "arn:aws:iam::000000000000:role/" + ROLE_NAME;
+ private static final String BUCKET_NAME = "s3-firehose";
+ private static final String STREAM_NAME = "s3-stream";
+ private static final int NUMBER_OF_ELEMENTS = 92;
+ private StreamExecutionEnvironment env;
+
+ private SdkHttpClient httpClient;
+ private S3Client s3Client;
+ private FirehoseClient firehoseClient;
+ private IamClient iamClient;
+
+ @Container
+ private static LocalstackContainer mockFirehoseContainer =
+ new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));
+
+ @BeforeEach
+ void setup() {
+ System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+ httpClient = AWSServicesTestUtils.createHttpClient();
+ s3Client = createS3Client(mockFirehoseContainer.getEndpoint(), httpClient);
+ firehoseClient = createFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient);
+ iamClient = createIamClient(mockFirehoseContainer.getEndpoint(), httpClient);
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ @AfterEach
+ void teardown() {
+ System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+ }
+
+ @Test
+ void firehoseSinkWritesCorrectDataToMockAWSServices() throws Exception {
+ LOG.info("1 - Creating the bucket for Firehose to deliver into...");
+ createBucket(s3Client, BUCKET_NAME);
+ LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket...");
+ createIAMRole(iamClient, ROLE_NAME);
+ LOG.info("3 - Creating the Firehose delivery stream...");
+ createDeliveryStream(STREAM_NAME, BUCKET_NAME, ROLE_ARN, firehoseClient);
+
+ KinesisFirehoseSink<String> kdsSink =
+ KinesisFirehoseSink.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .setDeliveryStreamName(STREAM_NAME)
+ .setMaxBatchSize(1)
+ .setFirehoseClientProperties(
+ createConfig(mockFirehoseContainer.getEndpoint()))
+ .build();
+
+ KinesisFirehoseTestUtils.getSampleDataGenerator(env, NUMBER_OF_ELEMENTS).sinkTo(kdsSink);
+ env.execute("Integration Test");
+
+ List<S3Object> objects =
+ listBucketObjects(
+ createS3Client(mockFirehoseContainer.getEndpoint(), httpClient),
+ BUCKET_NAME);
+ assertThat(objects.size()).isEqualTo(NUMBER_OF_ELEMENTS);
+ assertThat(
+ readObjectsFromS3Bucket(
+ s3Client,
+ objects,
+ BUCKET_NAME,
+ response -> new String(response.asByteArrayUnsafe())))
+ .containsAll(KinesisFirehoseTestUtils.getSampleData(NUMBER_OF_ELEMENTS));
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
new file mode 100644
index 0000000..40e9ace
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSink}. */
+class KinesisFirehoseSinkTest {
+
+ private static final ElementConverter<String, Record> elementConverter =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ @Test
+ void deliveryStreamNameMustNotBeNull() {
+ Assertions.assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ new KinesisFirehoseSink<>(
+ elementConverter,
+ 500,
+ 16,
+ 10000,
+ 4 * 1024 * 1024L,
+ 5000L,
+ 1000 * 1024L,
+ false,
+ null,
+ new Properties()))
+ .withMessageContaining(
+ "The delivery stream name must not be null when initializing the KDF Sink.");
+ }
+
+ @Test
+ void deliveryStreamNameMustNotBeEmpty() {
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new KinesisFirehoseSink<>(
+ elementConverter,
+ 500,
+ 16,
+ 10000,
+ 4 * 1024 * 1024L,
+ 5000L,
+ 1000 * 1024L,
+ false,
+ "",
+ new Properties()))
+ .withMessageContaining(
+ "The delivery stream name must be set when initializing the KDF Sink.");
+ }
+
+ @Test
+ void firehoseSinkFailsWhenAccessKeyIdIsNotProvided() {
+ Properties properties = createConfig("https://non-exisitent-location");
+ properties.setProperty(
+ AWS_CREDENTIALS_PROVIDER, AWSConfigConstants.CredentialProvider.BASIC.toString());
+ properties.remove(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER));
+ firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ properties, "Please set values for AWS Access Key ID");
+ }
+
+ @Test
+ void firehoseSinkFailsWhenRegionIsNotProvided() {
+ Properties properties = createConfig("https://non-exisitent-location");
+ properties.remove(AWS_REGION);
+ firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ properties, "region must not be null.");
+ }
+
+ @Test
+ void firehoseSinkFailsWhenUnableToConnectToRemoteService() {
+ Properties properties = createConfig("https://non-exisitent-location");
+ properties.remove(TRUST_ALL_CERTIFICATES);
+ firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ properties,
+ "Received an UnknownHostException when attempting to interact with a service.");
+ }
+
+ private void firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
+ Properties properties, String errorMessage) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ KinesisFirehoseSink<String> kdsSink =
+ KinesisFirehoseSink.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .setDeliveryStreamName("non-existent-stream")
+ .setMaxBatchSize(1)
+ .setFirehoseClientProperties(properties)
+ .build();
+
+ KinesisFirehoseTestUtils.getSampleDataGenerator(env, 10).sinkTo(kdsSink);
+
+ Assertions.assertThatExceptionOfType(JobExecutionException.class)
+ .isThrownBy(() -> env.execute("Integration Test"))
+ .havingCause()
+ .havingCause()
+ .withMessageContaining(errorMessage);
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
new file mode 100644
index 0000000..29160f7
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.concurrent.CompletionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkWriter}. */
+public class KinesisFirehoseSinkWriterTest {
+
+ private KinesisFirehoseSinkWriter<String> sinkWriter;
+
+ private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ @BeforeEach
+ void setup() {
+ TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+ Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
+ sinkWriter =
+ new KinesisFirehoseSinkWriter<>(
+ ELEMENT_CONVERTER_PLACEHOLDER,
+ sinkInitContext,
+ 50,
+ 16,
+ 10000,
+ 4 * 1024 * 1024,
+ 5000,
+ 1000 * 1024,
+ true,
+ "streamName",
+ sinkProperties);
+ }
+
+ @Test
+ void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
+ String testString = "{many hands make light work;";
+ Record record = Record.builder().data(SdkBytes.fromUtf8String(testString)).build();
+ assertThat(sinkWriter.getSizeInBytes(record))
+ .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
+ }
+
+ @Test
+ void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
+ throws IOException, InterruptedException {
+ TestSinkInitContext ctx = new TestSinkInitContext();
+ KinesisFirehoseSink<String> kinesisFirehoseSink =
+ new KinesisFirehoseSink<>(
+ ELEMENT_CONVERTER_PLACEHOLDER,
+ 12,
+ 16,
+ 10000,
+ 4 * 1024 * 1024L,
+ 5000L,
+ 1000 * 1024L,
+ true,
+ "test-stream",
+ AWSServicesTestUtils.createConfig("https://localhost"));
+ SinkWriter<String> writer = kinesisFirehoseSink.createWriter(ctx);
+
+ for (int i = 0; i < 12; i++) {
+ writer.write("data_bytes", null);
+ }
+ assertThatExceptionOfType(CompletionException.class)
+ .isThrownBy(() -> writer.flush(true))
+ .withCauseInstanceOf(SdkClientException.class)
+ .withMessageContaining(
+ "Unable to execute HTTP request: Connection refused: localhost/127.0.0.1:443");
+ assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
+ assertThat(ctx.metricGroup().getNumRecordsSendErrorsCounter().getCount()).isEqualTo(12);
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializerTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializerTest.java
new file mode 100644
index 0000000..5264e97
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseStateSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.firehose.model.Record;
+
+import java.io.IOException;
+
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
+import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
+
+/** Test class for {@link KinesisFirehoseStateSerializer}. */
+class KinesisFirehoseStateSerializerTest {
+
+ private static final ElementConverter<String, Record> ELEMENT_CONVERTER =
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(new SimpleStringSchema())
+ .build();
+
+ @Test
+ void testSerializeAndDeserialize() throws IOException {
+ BufferedRequestState<Record> expectedState =
+ getTestState(ELEMENT_CONVERTER, this::getRequestSize);
+
+ KinesisFirehoseStateSerializer serializer = new KinesisFirehoseStateSerializer();
+ BufferedRequestState<Record> actualState =
+ serializer.deserialize(1, serializer.serialize(expectedState));
+
+ assertThatBufferStatesAreEqual(actualState, expectedState);
+ }
+
+ private int getRequestSize(Record requestEntry) {
+ return requestEntry.data().asByteArrayUnsafe().length;
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
new file mode 100644
index 0000000..f4dee62
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
+import software.amazon.awssdk.services.firehose.model.ExtendedS3DestinationConfiguration;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the Localstack container.
+ */
+public class KinesisFirehoseTestUtils {
+
+ private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
+
+ public static FirehoseClient createFirehoseClient(String endpoint, SdkHttpClient httpClient) {
+ return AWSServicesTestUtils.createAwsSyncClient(
+ endpoint, httpClient, FirehoseClient.builder());
+ }
+
+ public static void createDeliveryStream(
+ String deliveryStreamName,
+ String bucketName,
+ String roleARN,
+ FirehoseClient firehoseClient) {
+ ExtendedS3DestinationConfiguration s3Config =
+ ExtendedS3DestinationConfiguration.builder()
+ .bucketARN(bucketName)
+ .roleARN(roleARN)
+ .build();
+ CreateDeliveryStreamRequest request =
+ CreateDeliveryStreamRequest.builder()
+ .deliveryStreamName(deliveryStreamName)
+ .extendedS3DestinationConfiguration(s3Config)
+ .deliveryStreamType(DeliveryStreamType.DIRECT_PUT)
+ .build();
+
+ firehoseClient.createDeliveryStream(request);
+ }
+
+ public static DataStream<String> getSampleDataGenerator(
+ StreamExecutionEnvironment env, int endValue) {
+ return env.fromSequence(1, endValue)
+ .map(Object::toString)
+ .returns(String.class)
+ .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data)));
+ }
+
+ public static List<String> getSampleData(int endValue) throws JsonProcessingException {
+ List<String> expectedElements = new ArrayList<>();
+ for (int i = 1; i <= endValue; i++) {
+ expectedElements.add(
+ MAPPER.writeValueAsString(ImmutableMap.of("data", String.valueOf(i))));
+ }
+ return expectedElements;
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
new file mode 100644
index 0000000..532b3b6
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.firehose.table;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TableOptionsBuilder;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+
+/**
+ * Test for {@link KinesisFirehoseDynamicSink} created by {@link
+ * KinesisFirehoseDynamicTableFactory}.
+ */
+class KinesisFirehoseDynamicTableFactoryTest {
+ private static final String DELIVERY_STREAM_NAME = "myDeliveryStream";
+
+ @Test
+ void testGoodTableSink() {
+ ResolvedSchema sinkSchema = defaultSinkSchema();
+ Map<String, String> sinkOptions = defaultTableOptions().build();
+
+ // Construct actual DynamicTableSink using FactoryUtil
+ KinesisFirehoseDynamicSink actualSink =
+ (KinesisFirehoseDynamicSink) createTableSink(sinkSchema, sinkOptions);
+
+ // Construct expected DynamicTableSink using factory under test
+ KinesisFirehoseDynamicSink expectedSink =
+ new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
+ .setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+ .setDeliveryStream(DELIVERY_STREAM_NAME)
+ .setFirehoseClientProperties(defaultSinkProperties())
+ .setEncodingFormat(new TestFormatFactory.EncodingFormatMock(","))
+ .build();
+
+ Assertions.assertThat(actualSink).isEqualTo(expectedSink);
+
+ // verify the produced sink
+ DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+ actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
+ Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
+ }
+
+ @Test
+ void testGoodTableSinkWithSinkOptions() {
+ ResolvedSchema sinkSchema = defaultSinkSchema();
+ Map<String, String> sinkOptions = defaultTableOptionsWithSinkOptions().build();
+
+ // Construct actual DynamicTableSink using FactoryUtil
+ KinesisFirehoseDynamicSink actualSink =
+ (KinesisFirehoseDynamicSink) createTableSink(sinkSchema, sinkOptions);
+
+ // Construct expected DynamicTableSink using factory under test
+ KinesisFirehoseDynamicSink expectedSink =
+ getDefaultSinkBuilder()
+ .setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+ .setDeliveryStream(DELIVERY_STREAM_NAME)
+ .setFirehoseClientProperties(defaultSinkProperties())
+ .setEncodingFormat(new TestFormatFactory.EncodingFormatMock(","))
+ .build();
+
+ Assertions.assertThat(actualSink).isEqualTo(expectedSink);
+
+ // verify the produced sink
+ DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+ actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
+ Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
+ }
+
+ private ResolvedSchema defaultSinkSchema() {
+ return ResolvedSchema.of(
+ Column.physical("name", DataTypes.STRING()),
+ Column.physical("curr_id", DataTypes.BIGINT()),
+ Column.physical("time", DataTypes.TIMESTAMP(3)));
+ }
+
+ private TableOptionsBuilder defaultTableOptionsWithSinkOptions() {
+ return defaultTableOptions()
+ .withTableOption("sink.fail-on-error", "true")
+ .withTableOption("sink.batch.max-size", "100")
+ .withTableOption("sink.requests.max-inflight", "100")
+ .withTableOption("sink.requests.max-buffered", "100")
+ .withTableOption("sink.flush-buffer.size", "1000")
+ .withTableOption("sink.flush-buffer.timeout", "1000");
+ }
+
+ private TableOptionsBuilder defaultTableOptions() {
+ String connector = KinesisFirehoseDynamicTableFactory.IDENTIFIER;
+ String format = TestFormatFactory.IDENTIFIER;
+ return new TableOptionsBuilder(connector, format)
+ // default table options
+ .withTableOption(
+ KinesisFirehoseConnectorOptions.DELIVERY_STREAM, DELIVERY_STREAM_NAME)
+ .withTableOption("aws.region", "us-west-2")
+ .withTableOption("aws.credentials.provider", "BASIC")
+ .withTableOption("aws.credentials.basic.accesskeyid", "ververicka")
+ .withTableOption(
+ "aws.credentials.basic.secretkey",
+ "SuperSecretSecretSquirrel") // default format options
+ .withFormatOption(TestFormatFactory.DELIMITER, ",")
+ .withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
+ }
+
+ private KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder getDefaultSinkBuilder() {
+ return new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
+ .setFailOnError(true)
+ .setMaxBatchSize(100)
+ .setMaxInFlightRequests(100)
+ .setMaxBufferSizeInBytes(1000)
+ .setMaxBufferedRequests(100)
+ .setMaxTimeInBufferMS(1000);
+ }
+
+ private Properties defaultSinkProperties() {
+ return new Properties() {
+ {
+ setProperty("aws.region", "us-west-2");
+ setProperty("aws.credentials.provider", "BASIC");
+ setProperty("aws.credentials.provider.basic.accesskeyid", "ververicka");
+ setProperty(
+ "aws.credentials.provider.basic.secretkey", "SuperSecretSecretSquirrel");
+ }
+ };
+ }
+}
diff --git a/flink-connector-aws-kinesis-firehose/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-connector-aws-kinesis-firehose/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000..2899913
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file
diff --git a/flink-connector-aws-kinesis-firehose/src/test/resources/archunit.properties b/flink-connector-aws-kinesis-firehose/src/test/resources/archunit.properties
new file mode 100644
index 0000000..15be88c
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was correct to flag a new
+# violation, please try to avoid creating the violation. If the violation was created due to a
+# shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git a/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties b/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..c4fa187
--- /dev/null
+++ b/flink-connector-aws-kinesis-firehose/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-sql-connector-aws-kinesis-firehose/pom.xml b/flink-sql-connector-aws-kinesis-firehose/pom.xml
new file mode 100644
index 0000000..8a587ae
--- /dev/null
+++ b/flink-sql-connector-aws-kinesis-firehose/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-parent</artifactId>
+ <version>4.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+ <name>Flink : Connectors : AWS : SQL : Amazon Kinesis Data Firehose</name>
+
+ <properties>
+ <japicmp.skip>true</japicmp.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.flink:flink-connector-base</include>
+ <include>org.apache.flink:flink-connector-aws-base</include>
+ <include>org.apache.flink:flink-connector-aws-kinesis-firehose</include>
+ <include>software.amazon.awssdk:*</include>
+ <include>io.netty:*</include>
+ <include>org.reactivestreams:*</include>
+ <include>org.apache.httpcomponents:*</include>
+ <include>com.typesafe.netty:*</include>
+ <include>commons-logging:commons-logging</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>software.amazon</pattern>
+ <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.software.amazon
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.io.netty
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.reactivestreams</pattern>
+ <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.org.reactivestreams
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.org.apache.http
+ </shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.typesafe.netty</pattern>
+ <shadedPattern>org.apache.flink.connector.firehose.sink.shaded.com.typesafe.netty
+ </shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-sql-connector-aws-kinesis-firehose/src/main/resources/META-INF/NOTICE b/flink-sql-connector-aws-kinesis-firehose/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7cf7302
--- /dev/null
+++ b/flink-sql-connector-aws-kinesis-firehose/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,48 @@
+flink-sql-connector-aws-kinesis-firehose
+
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- software.amazon.awssdk:firehose:2.17.247
+- software.amazon.awssdk:aws-json-protocol:2.17.247
+- software.amazon.awssdk:protocol-core:2.17.247
+- software.amazon.awssdk:profiles:2.17.247
+- software.amazon.awssdk:sdk-core:2.17.247
+- software.amazon.awssdk:auth:2.17.247
+- software.amazon.awssdk:http-client-spi:2.17.247
+- software.amazon.awssdk:regions:2.17.247
+- software.amazon.awssdk:annotations:2.17.247
+- software.amazon.awssdk:utils:2.17.247
+- software.amazon.awssdk:aws-core:2.17.247
+- software.amazon.awssdk:metrics-spi:2.17.247
+- software.amazon.awssdk:apache-client:2.17.247
+- software.amazon.awssdk:netty-nio-client:2.17.247
+- software.amazon.awssdk:sts:2.17.247
+- software.amazon.awssdk:aws-query-protocol:2.17.247
+- software.amazon.awssdk:json-utils:2.17.247
+- software.amazon.awssdk:third-party-jackson-core:2.17.247
+- io.netty:netty-codec-http:4.1.70.Final
+- io.netty:netty-codec-http2:4.1.70.Final
+- io.netty:netty-codec:4.1.70.Final
+- io.netty:netty-transport:4.1.70.Final
+- io.netty:netty-resolver:4.1.70.Final
+- io.netty:netty-common:4.1.70.Final
+- io.netty:netty-buffer:4.1.70.Final
+- io.netty:netty-handler:4.1.70.Final
+- io.netty:netty-transport-native-epoll:linux-x86_64:4.1.70.Final
+- io.netty:netty-transport-native-unix-common:4.1.70.Final
+- io.netty:netty-transport-classes-epoll:4.1.70.Final
+- com.typesafe.netty:netty-reactive-streams-http:2.0.5
+- com.typesafe.netty:netty-reactive-streams:2.0.5
+- org.apache.httpcomponents:httpclient:4.5.13
+- org.apache.httpcomponents:httpcore:4.4.14
+- commons-logging:commons-logging:1.1.3
+
+This project bundles the following dependencies under the Creative Commons Zero license (https://creativecommons.org/publicdomain/zero/1.0/).
+
+- org.reactivestreams:reactive-streams:1.0.3
+
diff --git a/pom.xml b/pom.xml
index 0dc0342..d701652 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,8 @@ under the License.
<module>flink-connector-aws-base</module>
<module>flink-connector-dynamodb</module>
<module>flink-sql-connector-dynamodb</module>
+ <module>flink-connector-aws-kinesis-firehose</module>
+ <module>flink-sql-connector-aws-kinesis-firehose</module>
</modules>
<dependencies>