You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/05 10:35:49 UTC

[flink-connector-aws] 02/04: [FLINK-29908][Connectors/Firehose] Externalize and configure E2E tests for Kinesis connector v2

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit d9a3b05a56232f02ce007321e6385df5deb2dbba
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Sat Dec 3 19:56:37 2022 +0000

    [FLINK-29908][Connectors/Firehose] Externalize and configure E2E tests for Kinesis connector v2
---
 .github/workflows/ci.yml                           |   4 +-
 .../pom.xml                                        |   7 -
 .../pom.xml                                        |  74 +++---
 .../table/test/KinesisStreamsTableApiIT.java       | 271 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 .../src/test/resources/send-orders.sql             |  36 +++
 flink-connector-aws-e2e-tests/pom.xml              |   9 +
 7 files changed, 374 insertions(+), 55 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 76962f3..f68d46d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -59,12 +59,12 @@ jobs:
         id: cache-flink
         with:
           path: ${{ env.FLINK_CACHE_DIR }}
-          key: ${{ inputs.flink_url }}
+          key: ${{ env.FLINK_URL }}
 
       - name: Download Flink binary
         working-directory: ${{ env.FLINK_CACHE_DIR }}
         if: steps.cache-flink.outputs.cache-hit != 'true'
-        run: wget -q -c ${{ inputs.flink_url }} -O - | tar -xz
+        run: wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz
 
       - name: Compile and test flink-connector-dynamodb
         run: |
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
index 8c9a1c3..b2e0878 100644
--- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
@@ -48,12 +48,6 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-test-utils</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
@@ -124,7 +118,6 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.22.1</version>
                 <configuration>
                     <systemPropertyVariables>
                         <!-- Required for Kinesalite. -->
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
similarity index 68%
copy from flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
copy to flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
index 8c9a1c3..1a5d793 100644
--- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
@@ -29,70 +29,53 @@
 
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>flink-connector-aws-kinesis-firehose-e2e-tests</artifactId>
-    <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Firehose e2e tests</name>
+    <artifactId>flink-connector-aws-kinesis-streams-e2e-tests</artifactId>
+    <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Streams e2e tests</name>
     <packaging>jar</packaging>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
-            <version>${project.version}</version>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
             <scope>test</scope>
-            <type>test-jar</type>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-aws-base</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
             <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.typesafe.netty</groupId>
+                    <artifactId>netty-reactive-streams-http</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-test-utils</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>testcontainers</artifactId>
+            <artifactId>flink-connector-aws-kinesis-streams</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-annotations</artifactId>
+            <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.typesafe.netty</groupId>
+                    <artifactId>netty-reactive-streams-http</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>software.amazon.awssdk</groupId>
-            <artifactId>s3</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>software.amazon.awssdk</groupId>
-            <artifactId>iam</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
+
     <build>
         <plugins>
             <plugin>
@@ -111,9 +94,9 @@
                     <artifactItems>
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
-                            <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+                            <artifactId>flink-sql-connector-aws-kinesis-streams</artifactId>
                             <version>${project.version}</version>
-                            <destFileName>sql-kinesis-firehose.jar</destFileName>
+                            <destFileName>sql-kinesis-streams.jar</destFileName>
                             <type>jar</type>
                             <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
@@ -124,17 +107,16 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.22.1</version>
                 <configuration>
                     <systemPropertyVariables>
                         <!-- Required for Kinesalite. -->
                         <!-- Including shaded and non-shaded conf to support test running from Maven and IntelliJ -->
                         <com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
                         <com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
-                        <org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor>true
-                        </org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor>
-                        <org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking>true
-                        </org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking>
+                        <org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>true
+                        </org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>
+                        <org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>true
+                        </org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>
                     </systemPropertyVariables>
                 </configuration>
             </plugin>
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
new file mode 100644
index 0000000..6c0a944
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.table.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */
+public class KinesisStreamsTableApiIT {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsTableApiIT.class);
+
+    private static final String ORDERS_STREAM = "orders";
+    private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
+    private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
+    private SdkHttpClient httpClient;
+    private KinesisClient kinesisClient;
+
+    private final Path sqlConnectorKinesisJar =
+            ResourceTestUtils.getResource(".*kinesis-streams.jar");
+    private static final Network network = Network.newNetwork();
+
+    @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
+
+    @ClassRule
+    public static final KinesaliteContainer KINESALITE =
+            new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE))
+                    .withNetwork(network)
+                    .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
+
+    public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+            TestcontainersSettings.builder()
+                    .environmentVariable("AWS_CBOR_DISABLE", "1")
+                    .environmentVariable(
+                            "FLINK_ENV_JAVA_OPTS",
+                            "-Dorg.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")
+                    .network(network)
+                    .logger(LOGGER)
+                    .dependsOn(KINESALITE)
+                    .build();
+
+    public static final FlinkContainers FLINK =
+            FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+    @BeforeClass
+    public static void setupFlink() throws Exception {
+        FLINK.start();
+    }
+
+    @AfterClass
+    public static void stopFlink() {
+        FLINK.stop();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+        httpClient = AWSServicesTestUtils.createHttpClient();
+        kinesisClient = KINESALITE.createHostClient(httpClient);
+        prepareStream(ORDERS_STREAM);
+    }
+
+    @After
+    public void teardown() {
+        System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+        AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+    }
+
+    @Test
+    public void testTableApiSourceAndSink() throws Exception {
+        executeSqlStatements(readSqlFile("send-orders.sql"));
+        List<Order> expected =
+                ImmutableList.of(
+                        new Order("A", 10),
+                        new Order("B", 12),
+                        new Order("C", 14),
+                        new Order("D", 16),
+                        new Order("E", 18));
+        // result order is not guaranteed
+        List<Order> result = readAllOrdersFromKinesis();
+        Assertions.assertThat(result).containsAll(expected);
+    }
+
+    private void prepareStream(String streamName) throws Exception {
+        final RateLimiter rateLimiter =
+                RateLimiterBuilder.newBuilder()
+                        .withRate(1, SECONDS)
+                        .withConstantThroughput()
+                        .build();
+
+        kinesisClient.createStream(
+                CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
+
+        Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
+        while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) {
+            if (deadline.isOverdue()) {
+                throw new RuntimeException("Failed to create stream within time");
+            }
+        }
+    }
+
+    private boolean streamExists(final String streamName) {
+        try {
+            return kinesisClient
+                            .describeStream(
+                                    DescribeStreamRequest.builder().streamName(streamName).build())
+                            .streamDescription()
+                            .streamStatus()
+                    == StreamStatus.ACTIVE;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private List<Order> readAllOrdersFromKinesis() throws Exception {
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+        List<Order> orders;
+        do {
+            orders =
+                    readMessagesFromStream(
+                            recordBytes -> fromJson(new String(recordBytes), Order.class));
+
+        } while (deadline.hasTimeLeft() && orders.size() < 5);
+
+        return orders;
+    }
+
+    private void executeSqlStatements(final List<String> sqlLines) throws Exception {
+        FLINK.submitSQLJob(
+                new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                        .addJars(sqlConnectorKinesisJar)
+                        .build());
+    }
+
+    private List<String> readSqlFile(final String resourceName) throws Exception {
+        return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI()));
+    }
+
+    private <T> T fromJson(final String json, final Class<T> type) {
+        try {
+            return OBJECT_MAPPER.readValue(json, type);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Test Failure.", e);
+        }
+    }
+
+    private <T> List<T> readMessagesFromStream(Function<byte[], T> deserialiser) throws Exception {
+        String shardIterator =
+                kinesisClient
+                        .getShardIterator(
+                                GetShardIteratorRequest.builder()
+                                        .shardId(DEFAULT_FIRST_SHARD_NAME)
+                                        .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
+                                        .streamName(KinesisStreamsTableApiIT.ORDERS_STREAM)
+                                        .build())
+                        .shardIterator();
+
+        List<Record> records =
+                kinesisClient
+                        .getRecords(
+                                GetRecordsRequest.builder().shardIterator(shardIterator).build())
+                        .records();
+        List<T> messages = new ArrayList<>();
+        records.forEach(record -> messages.add(deserialiser.apply(record.data().asByteArray())));
+        return messages;
+    }
+
+    /** POJO class for orders used by e2e test. */
+    public static class Order {
+        private final String code;
+        private final int quantity;
+
+        public Order(
+                @JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) {
+            this.code = code;
+            this.quantity = quantity;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            Order order = (Order) o;
+            return quantity == order.quantity && Objects.equals(code, order.code);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(code, quantity);
+        }
+
+        @Override
+        public String toString() {
+            return String.format("Order{code: %s, quantity: %d}", code, quantity);
+        }
+    }
+}
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql
new file mode 100644
index 0000000..29f2c88
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql
@@ -0,0 +1,36 @@
+--/*
+-- * Licensed to the Apache Software Foundation (ASF) under one
+-- * or more contributor license agreements.  See the NOTICE file
+-- * distributed with this work for additional information
+-- * regarding copyright ownership.  The ASF licenses this file
+-- * to you under the Apache License, Version 2.0 (the
+-- * "License"); you may not use this file except in compliance
+-- * with the License.  You may obtain a copy of the License at
+-- *
+-- *     http://www.apache.org/licenses/LICENSE-2.0
+-- *
+-- * Unless required by applicable law or agreed to in writing, software
+-- * distributed under the License is distributed on an "AS IS" BASIS,
+-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- * See the License for the specific language governing permissions and
+-- * limitations under the License.
+-- */
+
+CREATE TABLE orders (
+  `code` STRING,
+  `quantity` BIGINT
+) WITH (
+  'connector' = 'kinesis',
+  'stream' = 'orders',
+  'aws.region' = 'us-east-1',
+  'aws.endpoint' = 'https://kinesalite:4567',
+  'aws.credentials.provider' = 'BASIC',
+  'aws.credentials.basic.accesskeyid' = 'access key',
+  'aws.credentials.basic.secretkey' ='secret key',
+  'aws.trust.all.certificates' = 'true',
+  'sink.http-client.protocol.version' = 'HTTP1_1',
+  'sink.batch.max-size' = '1',
+  'format' = 'json'
+);
+
+INSERT INTO orders VALUES ('A', 10),('B', 12),('C', 14),('D', 16),('E', 18);
diff --git a/flink-connector-aws-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/pom.xml
index 0835d85..b1266aa 100644
--- a/flink-connector-aws-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/pom.xml
@@ -40,8 +40,17 @@ under the License.
 
     <modules>
         <module>flink-connector-aws-kinesis-firehose-e2e-tests</module>
+        <module>flink-connector-aws-kinesis-streams-e2e-tests</module>
     </modules>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
     <dependencyManagement>
         <!-- For dependency convergence -->
         <dependencies>