You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/05 10:35:49 UTC
[flink-connector-aws] 02/04: [FLINK-29908][Connectors/Firehose] Externalize and configure E2E tests for Kinesis connector v2
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit d9a3b05a56232f02ce007321e6385df5deb2dbba
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Sat Dec 3 19:56:37 2022 +0000
[FLINK-29908][Connectors/Firehose] Externalize and configure E2E tests for Kinesis connector v2
---
.github/workflows/ci.yml | 4 +-
.../pom.xml | 7 -
.../pom.xml | 74 +++---
.../table/test/KinesisStreamsTableApiIT.java | 271 +++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 +++
.../src/test/resources/send-orders.sql | 36 +++
flink-connector-aws-e2e-tests/pom.xml | 9 +
7 files changed, 374 insertions(+), 55 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 76962f3..f68d46d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -59,12 +59,12 @@ jobs:
id: cache-flink
with:
path: ${{ env.FLINK_CACHE_DIR }}
- key: ${{ inputs.flink_url }}
+ key: ${{ env.FLINK_URL }}
- name: Download Flink binary
working-directory: ${{ env.FLINK_CACHE_DIR }}
if: steps.cache-flink.outputs.cache-hit != 'true'
- run: wget -q -c ${{ inputs.flink_url }} -O - | tar -xz
+ run: wget -q -c ${{ env.FLINK_URL }} -O - | tar -xz
- name: Compile and test flink-connector-dynamodb
run: |
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
index 8c9a1c3..b2e0878 100644
--- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
@@ -48,12 +48,6 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-test-utils</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
@@ -124,7 +118,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.22.1</version>
<configuration>
<systemPropertyVariables>
<!-- Required for Kinesalite. -->
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
similarity index 68%
copy from flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
copy to flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
index 8c9a1c3..1a5d793 100644
--- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/pom.xml
@@ -29,70 +29,53 @@
<modelVersion>4.0.0</modelVersion>
- <artifactId>flink-connector-aws-kinesis-firehose-e2e-tests</artifactId>
- <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Firehose e2e tests</name>
+ <artifactId>flink-connector-aws-kinesis-streams-e2e-tests</artifactId>
+ <name>Flink : Connectors : AWS : E2E Tests : Amazon Kinesis Streams e2e tests</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-aws-kinesis-firehose</artifactId>
- <version>${project.version}</version>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
<scope>test</scope>
- <type>test-jar</type>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-base</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>com.typesafe.netty</groupId>
+ <artifactId>netty-reactive-streams-http</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-test-utils</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>testcontainers</artifactId>
+ <artifactId>flink-connector-aws-kinesis-streams</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
+ <type>test-jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>com.typesafe.netty</groupId>
+ <artifactId>netty-reactive-streams-http</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>s3</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>iam</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
+
<build>
<plugins>
<plugin>
@@ -111,9 +94,9 @@
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+ <artifactId>flink-sql-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
- <destFileName>sql-kinesis-firehose.jar</destFileName>
+ <destFileName>sql-kinesis-streams.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
@@ -124,17 +107,16 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.22.1</version>
<configuration>
<systemPropertyVariables>
<!-- Required for Kinesalite. -->
<!-- Including shaded and non-shaded conf to support test running from Maven and IntelliJ -->
<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
- <org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor>true
- </org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCbor>
- <org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking>true
- </org.apache.flink.kinesis-firehose.shaded.com.amazonaws.sdk.disableCertChecking>
+ <org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>true
+ </org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCbor>
+ <org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>true
+ </org.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking>
</systemPropertyVariables>
</configuration>
</plugin>
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
new file mode 100644
index 0000000..6c0a944
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.table.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */
+public class KinesisStreamsTableApiIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsTableApiIT.class);
+
+ private static final String ORDERS_STREAM = "orders";
+ private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
+ private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
+ private SdkHttpClient httpClient;
+ private KinesisClient kinesisClient;
+
+ private final Path sqlConnectorKinesisJar =
+ ResourceTestUtils.getResource(".*kinesis-streams.jar");
+ private static final Network network = Network.newNetwork();
+
+ @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
+
+ @ClassRule
+ public static final KinesaliteContainer KINESALITE =
+ new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE))
+ .withNetwork(network)
+ .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
+
+ public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+ TestcontainersSettings.builder()
+ .environmentVariable("AWS_CBOR_DISABLE", "1")
+ .environmentVariable(
+ "FLINK_ENV_JAVA_OPTS",
+ "-Dorg.apache.flink.kinesis-streams.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false")
+ .network(network)
+ .logger(LOGGER)
+ .dependsOn(KINESALITE)
+ .build();
+
+ public static final FlinkContainers FLINK =
+ FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+ @BeforeClass
+ public static void setupFlink() throws Exception {
+ FLINK.start();
+ }
+
+ @AfterClass
+ public static void stopFlink() {
+ FLINK.stop();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
+ httpClient = AWSServicesTestUtils.createHttpClient();
+ kinesisClient = KINESALITE.createHostClient(httpClient);
+ prepareStream(ORDERS_STREAM);
+ }
+
+ @After
+ public void teardown() {
+ System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
+ AWSGeneralUtil.closeResources(httpClient, kinesisClient);
+ }
+
+ @Test
+ public void testTableApiSourceAndSink() throws Exception {
+ executeSqlStatements(readSqlFile("send-orders.sql"));
+ List<Order> expected =
+ ImmutableList.of(
+ new Order("A", 10),
+ new Order("B", 12),
+ new Order("C", 14),
+ new Order("D", 16),
+ new Order("E", 18));
+ // result order is not guaranteed
+ List<Order> result = readAllOrdersFromKinesis();
+ Assertions.assertThat(result).containsAll(expected);
+ }
+
+ private void prepareStream(String streamName) throws Exception {
+ final RateLimiter rateLimiter =
+ RateLimiterBuilder.newBuilder()
+ .withRate(1, SECONDS)
+ .withConstantThroughput()
+ .build();
+
+ kinesisClient.createStream(
+ CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
+
+ Deadline deadline = Deadline.fromNow(Duration.ofMinutes(1));
+ while (!rateLimiter.getWhenReady(() -> streamExists(streamName))) {
+ if (deadline.isOverdue()) {
+ throw new RuntimeException("Failed to create stream within time");
+ }
+ }
+ }
+
+ private boolean streamExists(final String streamName) {
+ try {
+ return kinesisClient
+ .describeStream(
+ DescribeStreamRequest.builder().streamName(streamName).build())
+ .streamDescription()
+ .streamStatus()
+ == StreamStatus.ACTIVE;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private List<Order> readAllOrdersFromKinesis() throws Exception {
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+ List<Order> orders;
+ do {
+ orders =
+ readMessagesFromStream(
+ recordBytes -> fromJson(new String(recordBytes), Order.class));
+
+ } while (deadline.hasTimeLeft() && orders.size() < 5);
+
+ return orders;
+ }
+
+ private void executeSqlStatements(final List<String> sqlLines) throws Exception {
+ FLINK.submitSQLJob(
+ new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+ .addJars(sqlConnectorKinesisJar)
+ .build());
+ }
+
+ private List<String> readSqlFile(final String resourceName) throws Exception {
+ return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI()));
+ }
+
+ private <T> T fromJson(final String json, final Class<T> type) {
+ try {
+ return OBJECT_MAPPER.readValue(json, type);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Test Failure.", e);
+ }
+ }
+
+ private <T> List<T> readMessagesFromStream(Function<byte[], T> deserialiser) throws Exception {
+ String shardIterator =
+ kinesisClient
+ .getShardIterator(
+ GetShardIteratorRequest.builder()
+ .shardId(DEFAULT_FIRST_SHARD_NAME)
+ .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
+ .streamName(KinesisStreamsTableApiIT.ORDERS_STREAM)
+ .build())
+ .shardIterator();
+
+ List<Record> records =
+ kinesisClient
+ .getRecords(
+ GetRecordsRequest.builder().shardIterator(shardIterator).build())
+ .records();
+ List<T> messages = new ArrayList<>();
+ records.forEach(record -> messages.add(deserialiser.apply(record.data().asByteArray())));
+ return messages;
+ }
+
+ /** POJO class for orders used by e2e test. */
+ public static class Order {
+ private final String code;
+ private final int quantity;
+
+ public Order(
+ @JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) {
+ this.code = code;
+ this.quantity = quantity;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Order order = (Order) o;
+ return quantity == order.quantity && Objects.equals(code, order.code);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(code, quantity);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Order{code: %s, quantity: %d}", code, quantity);
+ }
+ }
+}
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql
new file mode 100644
index 0000000..29f2c88
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/resources/send-orders.sql
@@ -0,0 +1,36 @@
+--/*
+-- * Licensed to the Apache Software Foundation (ASF) under one
+-- * or more contributor license agreements. See the NOTICE file
+-- * distributed with this work for additional information
+-- * regarding copyright ownership. The ASF licenses this file
+-- * to you under the Apache License, Version 2.0 (the
+-- * "License"); you may not use this file except in compliance
+-- * with the License. You may obtain a copy of the License at
+-- *
+-- * http://www.apache.org/licenses/LICENSE-2.0
+-- *
+-- * Unless required by applicable law or agreed to in writing, software
+-- * distributed under the License is distributed on an "AS IS" BASIS,
+-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- * See the License for the specific language governing permissions and
+-- * limitations under the License.
+-- */
+
+CREATE TABLE orders (
+ `code` STRING,
+ `quantity` BIGINT
+) WITH (
+ 'connector' = 'kinesis',
+ 'stream' = 'orders',
+ 'aws.region' = 'us-east-1',
+ 'aws.endpoint' = 'https://kinesalite:4567',
+ 'aws.credentials.provider' = 'BASIC',
+ 'aws.credentials.basic.accesskeyid' = 'access key',
+ 'aws.credentials.basic.secretkey' ='secret key',
+ 'aws.trust.all.certificates' = 'true',
+ 'sink.http-client.protocol.version' = 'HTTP1_1',
+ 'sink.batch.max-size' = '1',
+ 'format' = 'json'
+);
+
+INSERT INTO orders VALUES ('A', 10),('B', 12),('C', 14),('D', 16),('E', 18);
diff --git a/flink-connector-aws-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/pom.xml
index 0835d85..b1266aa 100644
--- a/flink-connector-aws-e2e-tests/pom.xml
+++ b/flink-connector-aws-e2e-tests/pom.xml
@@ -40,8 +40,17 @@ under the License.
<modules>
<module>flink-connector-aws-kinesis-firehose-e2e-tests</module>
+ <module>flink-connector-aws-kinesis-streams-e2e-tests</module>
</modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
<dependencyManagement>
<!-- For dependency convergence -->
<dependencies>