You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/05/07 15:25:12 UTC
[flink] 01/02: [FLINK-17170][kinesis] Move KinesaliteContainer to
flink-connector-kinesis.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a102549f08759e177074dad286fef2f56176d005
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue May 4 18:01:18 2021 +0200
[FLINK-17170][kinesis] Move KinesaliteContainer to flink-connector-kinesis.
This testcontainer will be used in an ITCase in the next commit.
Also move system properties required for test into pom.xml.
---
.../kinesis/testutils/KinesaliteContainer.java | 137 +++++++++++++++++++++
.../flink-streaming-kinesis-test/pom.xml | 16 +++
.../kinesis/test/KinesisTableApiITCase.java | 51 +++-----
.../test/containers/KinesaliteContainer.java | 52 --------
.../resources}/filter-large-orders.sql | 0
.../src/test/resources/log4j2-test.properties | 28 +++++
6 files changed, 201 insertions(+), 83 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
new file mode 100644
index 0000000..800ab96
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.ListStreamsResult;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static com.amazonaws.SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR;
+import static com.amazonaws.SDKGlobalConfiguration.SECRET_KEY_ENV_VAR;
+
+/**
+ * A testcontainer based on Kinesalite.
+ *
+ * <p>Note that the more obvious localstack container with Kinesis took 1 minute to start vs 10
+ * seconds of Kinesalite.
+ */
+public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
+ private static final String ACCESS_KEY = "access key";
+ private static final String SECRET_KEY = "secret key";
+
+ public KinesaliteContainer(DockerImageName imageName) {
+ super(imageName);
+
+ withEnv(ACCESS_KEY_ENV_VAR, ACCESS_KEY);
+ withEnv(SECRET_KEY_ENV_VAR, ACCESS_KEY);
+ withExposedPorts(4567);
+ waitingFor(new ListStreamsWaitStrategy());
+ withCreateContainerCmdModifier(
+ cmd ->
+ cmd.withEntrypoint(
+ "/tini",
+ "--",
+ "/usr/src/app/node_modules/kinesalite/cli.js",
+ "--path",
+ "/var/lib/kinesalite",
+ "--ssl"));
+ }
+
+ /** Returns the endpoint url to access the container from outside the docker network. */
+ public String getContainerEndpointUrl() {
+ return String.format("https://%s:%s", getContainerIpAddress(), getMappedPort(4567));
+ }
+
+ /** Returns the endpoint url to access the host from inside the docker network. */
+ public String getHostEndpointUrl() {
+ return String.format("https://%s:%s", getHost(), getMappedPort(4567));
+ }
+
+ public String getAccessKey() {
+ return ACCESS_KEY;
+ }
+
+ public String getSecretKey() {
+ return SECRET_KEY;
+ }
+
+ /** Returns the properties to access the container from outside the docker network. */
+ public Properties getContainerProperties() {
+ return getProperties(getContainerEndpointUrl());
+ }
+
+ /** Returns the properties to access the host from inside the docker network. */
+ public Properties getHostProperties() {
+ return getProperties(getHostEndpointUrl());
+ }
+
+ /** Returns the client to access the container from outside the docker network. */
+ public AmazonKinesis getContainerClient() {
+ return getClient(getContainerEndpointUrl());
+ }
+
+ /** Returns the client to access the host from inside the docker network. */
+ public AmazonKinesis getHostClient() {
+ return getClient(getHostEndpointUrl());
+ }
+
+ private AmazonKinesis getClient(String endPoint) {
+ return AmazonKinesisClientBuilder.standard()
+ .withCredentials(
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(getAccessKey(), getSecretKey())))
+ .withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(endPoint, "us-east-1"))
+ .build();
+ }
+
+ private Properties getProperties(String endpointUrl) {
+ Properties config = new Properties();
+ config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ config.setProperty(AWSConfigConstants.AWS_ENDPOINT, endpointUrl);
+ config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, getAccessKey());
+ config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, getSecretKey());
+ return config;
+ }
+
+ private class ListStreamsWaitStrategy extends AbstractWaitStrategy {
+ @Override
+ protected void waitUntilReady() {
+ Unreliables.retryUntilSuccess(
+ (int) this.startupTimeout.getSeconds(),
+ TimeUnit.SECONDS,
+ () -> this.getRateLimiter().getWhenReady(() -> tryList()));
+ }
+
+ private ListStreamsResult tryList() {
+ return getContainerClient().listStreams();
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
index c398c27..cd4af0e 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -125,6 +125,22 @@ under the License.
</artifactItems>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.1</version>
+ <configuration>
+ <systemPropertyVariables>
+ <!-- Required for Kinesalite. -->
+ <!-- Including shaded and non-shaded conf to support test running from Maven and IntelliJ -->
+ <com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
+ <com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
+ <org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor>true</org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor>
+ <org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking>true</org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
</plugins>
</build>
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
index bed4cf4..5256a50 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
@@ -19,8 +19,8 @@
package org.apache.flink.streaming.kinesis.test;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
-import org.apache.flink.streaming.kinesis.test.containers.KinesaliteContainer;
import org.apache.flink.streaming.kinesis.test.model.Order;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.categories.TravisGroup1;
@@ -34,11 +34,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -49,62 +49,52 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/** End-to-end test for Kinesis Table API using Kinesalite. */
@Category(value = {TravisGroup1.class})
public class KinesisTableApiITCase extends TestLogger {
private static final String ORDERS_STREAM = "orders";
private static final String LARGE_ORDERS_STREAM = "large_orders";
+ private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar");
- private final Network network = Network.newNetwork();
+ private static final Network network = Network.newNetwork();
@ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
- @Rule
- public final KinesaliteContainer kinesalite = new KinesaliteContainer().withNetwork(network);
+ @ClassRule
+ public static final KinesaliteContainer KINESALITE =
+ new KinesaliteContainer(
+ DockerImageName.parse("instructure/kinesalite").withTag("latest"))
+ .withNetwork(network)
+ .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
private KinesisPubsubClient kinesisClient;
- @Rule
- public final FlinkContainer flink =
+ @ClassRule
+ public static final FlinkContainer FLINK =
FlinkContainer.builder()
.build()
- .withEnv("AWS_ACCESS_KEY_ID", "fakeid")
- .withEnv("AWS_SECRET_KEY", "fakekey")
+ .withEnv("AWS_ACCESS_KEY_ID", KINESALITE.getAccessKey())
+ .withEnv("AWS_SECRET_KEY", KINESALITE.getSecretKey())
.withEnv("AWS_CBOR_DISABLE", "1")
.withEnv(
"FLINK_ENV_JAVA_OPTS",
"-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking")
.withNetwork(network)
- .dependsOn(kinesalite);
+ .dependsOn(KINESALITE);
@Before
public void setUp() throws Exception {
- // Required for Kinesalite.
- // Including shaded and non-shaded conf to support test running from Maven and IntelliJ
- System.setProperty("com.amazonaws.sdk.disableCertChecking", "1");
- System.setProperty("com.amazonaws.sdk.disableCbor", "1");
- System.setProperty(
- "org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking", "1");
- System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "1");
-
- Properties properties = new Properties();
- properties.setProperty(AWS_ENDPOINT, kinesalite.getEndpointUrl());
- properties.setProperty(AWS_ACCESS_KEY_ID, "ak");
- properties.setProperty(AWS_SECRET_ACCESS_KEY, "sk");
+ Properties properties = KINESALITE.getContainerProperties();
kinesisClient = new KinesisPubsubClient(properties);
kinesisClient.createTopic(ORDERS_STREAM, 1, properties);
kinesisClient.createTopic(LARGE_ORDERS_STREAM, 1, properties);
}
- @Test(timeout = 120_000)
+ @Test
public void testTableApiSourceAndSink() throws Exception {
List<Order> smallOrders = ImmutableList.of(new Order("A", 5), new Order("B", 10));
@@ -118,8 +108,7 @@ public class KinesisTableApiITCase extends TestLogger {
executeSqlStatements(readSqlFile("filter-large-orders.sql"));
List<Order> result = readAllOrdersFromKinesis(kinesisClient);
- assertEquals(expected.size(), result.size());
- assertTrue(result.containsAll(expected));
+ assertEquals(expected, result);
}
private List<Order> readAllOrdersFromKinesis(final KinesisPubsubClient client)
@@ -138,11 +127,11 @@ public class KinesisTableApiITCase extends TestLogger {
}
private List<String> readSqlFile(final String resourceName) throws Exception {
- return Files.readAllLines(Paths.get(getClass().getResource(resourceName).toURI()));
+ return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI()));
}
private void executeSqlStatements(final List<String> sqlLines) throws Exception {
- flink.submitSQLJob(
+ FLINK.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJars(sqlConnectorKinesisJar)
.build());
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java
deleted file mode 100644
index 5162106..0000000
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.kinesis.test.containers;
-
-import org.testcontainers.containers.GenericContainer;
-
-/** A test Kinesis Data Streams container using Kinesalite. */
-public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
- public static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
-
- private static final String IMAGE = "instructure/kinesalite:latest";
- private static final int PORT = 4567;
-
- public KinesaliteContainer() {
- super(IMAGE);
- }
-
- @Override
- protected void configure() {
- withExposedPorts(PORT);
- withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
- withCreateContainerCmdModifier(
- cmd ->
- cmd.withEntrypoint(
- "/tini",
- "--",
- "/usr/src/app/node_modules/kinesalite/cli.js",
- "--path",
- "/var/lib/kinesalite",
- "--ssl"));
- }
-
- public String getEndpointUrl() {
- return "https://" + getHost() + ":" + getMappedPort(PORT);
- }
-}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql
similarity index 100%
rename from flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql
rename to flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..e463a0e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n