You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/05/07 15:25:12 UTC

[flink] 01/02: [FLINK-17170][kinesis] Move KinesaliteContainer to flink-connector-kinesis.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a102549f08759e177074dad286fef2f56176d005
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue May 4 18:01:18 2021 +0200

    [FLINK-17170][kinesis] Move KinesaliteContainer to flink-connector-kinesis.
    
    This testcontainer will be used in an ITCase in the next commit.
    
    Also move system properties required for test into pom.xml.
---
 .../kinesis/testutils/KinesaliteContainer.java     | 137 +++++++++++++++++++++
 .../flink-streaming-kinesis-test/pom.xml           |  16 +++
 .../kinesis/test/KinesisTableApiITCase.java        |  51 +++-----
 .../test/containers/KinesaliteContainer.java       |  52 --------
 .../resources}/filter-large-orders.sql             |   0
 .../src/test/resources/log4j2-test.properties      |  28 +++++
 6 files changed, 201 insertions(+), 83 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
new file mode 100644
index 0000000..800ab96
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.ListStreamsResult;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static com.amazonaws.SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR;
+import static com.amazonaws.SDKGlobalConfiguration.SECRET_KEY_ENV_VAR;
+
+/**
+ * A testcontainer based on Kinesalite.
+ *
+ * <p>Note that the more obvious localstack container with Kinesis took 1 minute to start vs 10
+ * seconds of Kinesalite.
+ */
+public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
+    private static final String ACCESS_KEY = "access key";
+    private static final String SECRET_KEY = "secret key";
+
+    public KinesaliteContainer(DockerImageName imageName) {
+        super(imageName);
+
+        withEnv(ACCESS_KEY_ENV_VAR, ACCESS_KEY);
+        withEnv(SECRET_KEY_ENV_VAR, ACCESS_KEY);
+        withExposedPorts(4567);
+        waitingFor(new ListStreamsWaitStrategy());
+        withCreateContainerCmdModifier(
+                cmd ->
+                        cmd.withEntrypoint(
+                                "/tini",
+                                "--",
+                                "/usr/src/app/node_modules/kinesalite/cli.js",
+                                "--path",
+                                "/var/lib/kinesalite",
+                                "--ssl"));
+    }
+
+    /** Returns the endpoint url to access the container from outside the docker network. */
+    public String getContainerEndpointUrl() {
+        return String.format("https://%s:%s", getContainerIpAddress(), getMappedPort(4567));
+    }
+
+    /** Returns the endpoint url to access the host from inside the docker network. */
+    public String getHostEndpointUrl() {
+        return String.format("https://%s:%s", getHost(), getMappedPort(4567));
+    }
+
+    public String getAccessKey() {
+        return ACCESS_KEY;
+    }
+
+    public String getSecretKey() {
+        return SECRET_KEY;
+    }
+
+    /** Returns the properties to access the container from outside the docker network. */
+    public Properties getContainerProperties() {
+        return getProperties(getContainerEndpointUrl());
+    }
+
+    /** Returns the properties to access the host from inside the docker network. */
+    public Properties getHostProperties() {
+        return getProperties(getHostEndpointUrl());
+    }
+
+    /** Returns the client to access the container from outside the docker network. */
+    public AmazonKinesis getContainerClient() {
+        return getClient(getContainerEndpointUrl());
+    }
+
+    /** Returns the client to access the host from inside the docker network. */
+    public AmazonKinesis getHostClient() {
+        return getClient(getHostEndpointUrl());
+    }
+
+    private AmazonKinesis getClient(String endPoint) {
+        return AmazonKinesisClientBuilder.standard()
+                .withCredentials(
+                        new AWSStaticCredentialsProvider(
+                                new BasicAWSCredentials(getAccessKey(), getSecretKey())))
+                .withEndpointConfiguration(
+                        new AwsClientBuilder.EndpointConfiguration(endPoint, "us-east-1"))
+                .build();
+    }
+
+    private Properties getProperties(String endpointUrl) {
+        Properties config = new Properties();
+        config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+        config.setProperty(AWSConfigConstants.AWS_ENDPOINT, endpointUrl);
+        config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, getAccessKey());
+        config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, getSecretKey());
+        return config;
+    }
+
+    private class ListStreamsWaitStrategy extends AbstractWaitStrategy {
+        @Override
+        protected void waitUntilReady() {
+            Unreliables.retryUntilSuccess(
+                    (int) this.startupTimeout.getSeconds(),
+                    TimeUnit.SECONDS,
+                    () -> this.getRateLimiter().getWhenReady(() -> tryList()));
+        }
+
+        private ListStreamsResult tryList() {
+            return getContainerClient().listStreams();
+        }
+    }
+}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
index c398c27..cd4af0e 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -125,6 +125,22 @@ under the License.
 					</artifactItems>
 				</configuration>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.22.1</version>
+				<configuration>
+					<systemPropertyVariables>
+						<!-- Required for Kinesalite. -->
+						<!-- Including shaded and non-shaded conf to support test running from Maven and IntelliJ -->
+						<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
+						<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
+						<org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor>true</org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor>
+						<org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking>true</org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
index bed4cf4..5256a50 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.kinesis.test;
 
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
-import org.apache.flink.streaming.kinesis.test.containers.KinesaliteContainer;
 import org.apache.flink.streaming.kinesis.test.model.Order;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.categories.TravisGroup1;
@@ -34,11 +34,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.Timeout;
 import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -49,62 +49,52 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /** End-to-end test for Kinesis Table API using Kinesalite. */
 @Category(value = {TravisGroup1.class})
 public class KinesisTableApiITCase extends TestLogger {
     private static final String ORDERS_STREAM = "orders";
     private static final String LARGE_ORDERS_STREAM = "large_orders";
+    private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
 
     private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar");
-    private final Network network = Network.newNetwork();
+    private static final Network network = Network.newNetwork();
 
     @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
 
-    @Rule
-    public final KinesaliteContainer kinesalite = new KinesaliteContainer().withNetwork(network);
+    @ClassRule
+    public static final KinesaliteContainer KINESALITE =
+            new KinesaliteContainer(
+                            DockerImageName.parse("instructure/kinesalite").withTag("latest"))
+                    .withNetwork(network)
+                    .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
 
     private KinesisPubsubClient kinesisClient;
 
-    @Rule
-    public final FlinkContainer flink =
+    @ClassRule
+    public static final FlinkContainer FLINK =
             FlinkContainer.builder()
                     .build()
-                    .withEnv("AWS_ACCESS_KEY_ID", "fakeid")
-                    .withEnv("AWS_SECRET_KEY", "fakekey")
+                    .withEnv("AWS_ACCESS_KEY_ID", KINESALITE.getAccessKey())
+                    .withEnv("AWS_SECRET_KEY", KINESALITE.getSecretKey())
                     .withEnv("AWS_CBOR_DISABLE", "1")
                     .withEnv(
                             "FLINK_ENV_JAVA_OPTS",
                             "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking")
                     .withNetwork(network)
-                    .dependsOn(kinesalite);
+                    .dependsOn(KINESALITE);
 
     @Before
     public void setUp() throws Exception {
-        // Required for Kinesalite.
-        // Including shaded and non-shaded conf to support test running from Maven and IntelliJ
-        System.setProperty("com.amazonaws.sdk.disableCertChecking", "1");
-        System.setProperty("com.amazonaws.sdk.disableCbor", "1");
-        System.setProperty(
-                "org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking", "1");
-        System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "1");
-
-        Properties properties = new Properties();
-        properties.setProperty(AWS_ENDPOINT, kinesalite.getEndpointUrl());
-        properties.setProperty(AWS_ACCESS_KEY_ID, "ak");
-        properties.setProperty(AWS_SECRET_ACCESS_KEY, "sk");
+        Properties properties = KINESALITE.getContainerProperties();
 
         kinesisClient = new KinesisPubsubClient(properties);
         kinesisClient.createTopic(ORDERS_STREAM, 1, properties);
         kinesisClient.createTopic(LARGE_ORDERS_STREAM, 1, properties);
     }
 
-    @Test(timeout = 120_000)
+    @Test
     public void testTableApiSourceAndSink() throws Exception {
         List<Order> smallOrders = ImmutableList.of(new Order("A", 5), new Order("B", 10));
 
@@ -118,8 +108,7 @@ public class KinesisTableApiITCase extends TestLogger {
         executeSqlStatements(readSqlFile("filter-large-orders.sql"));
 
         List<Order> result = readAllOrdersFromKinesis(kinesisClient);
-        assertEquals(expected.size(), result.size());
-        assertTrue(result.containsAll(expected));
+        assertEquals(expected, result);
     }
 
     private List<Order> readAllOrdersFromKinesis(final KinesisPubsubClient client)
@@ -138,11 +127,11 @@ public class KinesisTableApiITCase extends TestLogger {
     }
 
     private List<String> readSqlFile(final String resourceName) throws Exception {
-        return Files.readAllLines(Paths.get(getClass().getResource(resourceName).toURI()));
+        return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI()));
     }
 
     private void executeSqlStatements(final List<String> sqlLines) throws Exception {
-        flink.submitSQLJob(
+        FLINK.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
                         .addJars(sqlConnectorKinesisJar)
                         .build());
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java
deleted file mode 100644
index 5162106..0000000
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.kinesis.test.containers;
-
-import org.testcontainers.containers.GenericContainer;
-
-/** A test Kinesis Data Streams container using Kinesalite. */
-public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
-    public static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
-
-    private static final String IMAGE = "instructure/kinesalite:latest";
-    private static final int PORT = 4567;
-
-    public KinesaliteContainer() {
-        super(IMAGE);
-    }
-
-    @Override
-    protected void configure() {
-        withExposedPorts(PORT);
-        withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
-        withCreateContainerCmdModifier(
-                cmd ->
-                        cmd.withEntrypoint(
-                                "/tini",
-                                "--",
-                                "/usr/src/app/node_modules/kinesalite/cli.js",
-                                "--path",
-                                "/var/lib/kinesalite",
-                                "--ssl"));
-    }
-
-    public String getEndpointUrl() {
-        return "https://" + getHost() + ":" + getMappedPort(PORT);
-    }
-}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql
similarity index 100%
rename from flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql
rename to flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..e463a0e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n