You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/05/07 15:25:11 UTC

[flink] branch release-1.13 updated (7d569e6 -> 282f9a3)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7d569e6  [hotfix][docs] Fix typo in dependency_management.md
     new a102549  [FLINK-17170][kinesis] Move KinesaliteContainer to flink-connector-kinesis.
     new 282f9a3  [FLINK-17170][kinesis] Fix deadlock during stop-with-savepoint.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-connectors/flink-connector-kinesis/pom.xml   |  19 +++
 .../connectors/kinesis/FlinkKinesisConsumer.java   |   5 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 157 +++++++++++++++++++++
 .../kinesis/testutils/KinesaliteContainer.java     | 137 ++++++++++++++++++
 .../flink-streaming-kinesis-test/pom.xml           |  16 +++
 .../kinesis/test/KinesisTableApiITCase.java        |  51 +++----
 .../test/containers/KinesaliteContainer.java       |  52 -------
 .../resources}/filter-large-orders.sql             |   0
 .../src/test/resources/log4j2-test.properties      |   2 +-
 9 files changed, 353 insertions(+), 86 deletions(-)
 create mode 100644 flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
 create mode 100644 flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
 delete mode 100644 flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java
 rename flink-end-to-end-tests/flink-streaming-kinesis-test/src/{main/resources/org/apache/flink/streaming/kinesis/test => test/resources}/filter-large-orders.sql (100%)
 copy {flink-connectors/flink-connector-cassandra => flink-end-to-end-tests/flink-streaming-kinesis-test}/src/test/resources/log4j2-test.properties (98%)

[flink] 02/02: [FLINK-17170][kinesis] Fix deadlock during stop-with-savepoint.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 282f9a3d5505a5aa58d7d9cca466939610d41ed3
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue May 4 09:38:30 2021 +0200

    [FLINK-17170][kinesis] Fix deadlock during stop-with-savepoint.
    
    During stop-with-savepoint cancel is called under lock in legacy sources. Thus, if the fetcher is trying to emit a record at the same time, it cannot obtain the checkpoint lock. This behavior leads to a deadlock while cancel awaits the termination of the fetcher.
    The fix is to mostly rely on the termination inside the run method. As a safe-guard, close also awaits termination where close is always caused without lock.
---
 flink-connectors/flink-connector-kinesis/pom.xml   |  19 +++
 .../connectors/kinesis/FlinkKinesisConsumer.java   |   5 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 157 +++++++++++++++++++++
 3 files changed, 179 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 4339ac1..6aae404 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -169,6 +169,13 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>testcontainers</artifactId>
+			<version>1.15.3</version>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- Kinesis table factory testing -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -336,6 +343,18 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.22.1</version>
+				<configuration>
+					<systemPropertyVariables>
+						<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
+						<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 </project>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 7f30dca..b0a729f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -401,7 +401,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
         running = false;
 
         KinesisDataFetcher fetcher = this.fetcher;
-        this.fetcher = null;
 
         // this method might be called before the subtask actually starts running,
         // so we must check if the fetcher is actually created
@@ -409,7 +408,6 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
             try {
                 // interrupt the fetcher of any work
                 fetcher.shutdownFetcher();
-                fetcher.awaitTermination();
             } catch (Exception e) {
                 LOG.warn("Error while closing Kinesis data fetcher", e);
             }
@@ -419,6 +417,9 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
     @Override
     public void close() throws Exception {
         cancel();
+        // safe-guard when the fetcher has been interrupted, make sure to not leak resources
+        fetcher.awaitTermination();
+        this.fetcher = null;
         super.close();
     }
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
new file mode 100644
index 0000000..0b224c3
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertThat;
+
+/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
+public class FlinkKinesisITCase {
+    public static final String TEST_STREAM = "test_stream";
+
+    @ClassRule
+    public static MiniClusterWithClientResource miniCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder().build());
+
+    @ClassRule
+    public static KinesaliteContainer kinesalite =
+            new KinesaliteContainer(
+                    DockerImageName.parse("instructure/kinesalite").withTag("latest"));
+
+    @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+    private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
+
+    private KinesisPubsubClient client;
+
+    @Before
+    public void setupClient() {
+        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+    }
+
+    /**
+     * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
+     *
+     * <ol>
+     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *       with very slowly (using up a large chunk of time of the mailbox).
+     *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
+     *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
+     *       cancel.
+     *   <li>With the fix, the job proceeds and we can lift the backpressure.
+     * </ol>
+     */
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        client.createTopic(TEST_STREAM, 1, new Properties());
+
+        // add elements to the test stream
+        int numElements = 10;
+        List<String> elements =
+                IntStream.range(0, numElements)
+                        .mapToObj(String::valueOf)
+                        .collect(Collectors.toList());
+        for (String element : elements) {
+            client.sendMessage(TEST_STREAM, element);
+        }
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        Properties config = kinesalite.getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        FlinkKinesisConsumer<String> consumer =
+                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+
+        // call stop with savepoint in another thread
+        ForkJoinTask<Object> stopTask =
+                ForkJoinPool.commonPool()
+                        .submit(
+                                () -> {
+                                    WaitingMapper.firstElement.await();
+                                    stopWithSavepoint();
+                                    WaitingMapper.stopped = true;
+                                    return null;
+                                });
+
+        try {
+            List<String> result =
+                    env.addSource(consumer).map(new WaitingMapper()).executeAndCollect(10000);
+            // stop with savepoint will most likely only return a small subset of the elements
+            // validate that the prefix is as expected
+            assertThat(result, hasSize(lessThan(numElements)));
+            assertThat(result, equalTo(elements.subList(0, result.size())));
+        } finally {
+            stopTask.cancel(true);
+        }
+    }
+
+    private String stopWithSavepoint() throws Exception {
+        JobStatusMessage job =
+                miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+        return miniCluster
+                .getClusterClient()
+                .stopWithSavepoint(job.getJobId(), true, temp.getRoot().getAbsolutePath())
+                .get();
+    }
+
+    private static class WaitingMapper implements MapFunction<String, String> {
+        static CountDownLatch firstElement = new CountDownLatch(1);
+        static volatile boolean stopped = false;
+
+        @Override
+        public String map(String value) throws Exception {
+            if (firstElement.getCount() > 0) {
+                firstElement.countDown();
+            }
+            if (!stopped) {
+                Thread.sleep(100);
+            }
+            return value;
+        }
+    }
+}

[flink] 01/02: [FLINK-17170][kinesis] Move KinesaliteContainer to flink-connector-kinesis.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a102549f08759e177074dad286fef2f56176d005
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue May 4 18:01:18 2021 +0200

    [FLINK-17170][kinesis] Move KinesaliteContainer to flink-connector-kinesis.
    
    This testcontainer will be used in an ITCase in the next commit.
    
    Also move system properties required for test into pom.xml.
---
 .../kinesis/testutils/KinesaliteContainer.java     | 137 +++++++++++++++++++++
 .../flink-streaming-kinesis-test/pom.xml           |  16 +++
 .../kinesis/test/KinesisTableApiITCase.java        |  51 +++-----
 .../test/containers/KinesaliteContainer.java       |  52 --------
 .../resources}/filter-large-orders.sql             |   0
 .../src/test/resources/log4j2-test.properties      |  28 +++++
 6 files changed, 201 insertions(+), 83 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
new file mode 100644
index 0000000..800ab96
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.ListStreamsResult;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static com.amazonaws.SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR;
+import static com.amazonaws.SDKGlobalConfiguration.SECRET_KEY_ENV_VAR;
+
+/**
+ * A testcontainer based on Kinesalite.
+ *
+ * <p>Note that the more obvious localstack container with Kinesis took 1 minute to start vs 10
+ * seconds of Kinesalite.
+ */
+public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
+    private static final String ACCESS_KEY = "access key";
+    private static final String SECRET_KEY = "secret key";
+
+    public KinesaliteContainer(DockerImageName imageName) {
+        super(imageName);
+
+        withEnv(ACCESS_KEY_ENV_VAR, ACCESS_KEY);
+        withEnv(SECRET_KEY_ENV_VAR, ACCESS_KEY);
+        withExposedPorts(4567);
+        waitingFor(new ListStreamsWaitStrategy());
+        withCreateContainerCmdModifier(
+                cmd ->
+                        cmd.withEntrypoint(
+                                "/tini",
+                                "--",
+                                "/usr/src/app/node_modules/kinesalite/cli.js",
+                                "--path",
+                                "/var/lib/kinesalite",
+                                "--ssl"));
+    }
+
+    /** Returns the endpoint url to access the container from outside the docker network. */
+    public String getContainerEndpointUrl() {
+        return String.format("https://%s:%s", getContainerIpAddress(), getMappedPort(4567));
+    }
+
+    /** Returns the endpoint url to access the host from inside the docker network. */
+    public String getHostEndpointUrl() {
+        return String.format("https://%s:%s", getHost(), getMappedPort(4567));
+    }
+
+    public String getAccessKey() {
+        return ACCESS_KEY;
+    }
+
+    public String getSecretKey() {
+        return SECRET_KEY;
+    }
+
+    /** Returns the properties to access the container from outside the docker network. */
+    public Properties getContainerProperties() {
+        return getProperties(getContainerEndpointUrl());
+    }
+
+    /** Returns the properties to access the host from inside the docker network. */
+    public Properties getHostProperties() {
+        return getProperties(getHostEndpointUrl());
+    }
+
+    /** Returns the client to access the container from outside the docker network. */
+    public AmazonKinesis getContainerClient() {
+        return getClient(getContainerEndpointUrl());
+    }
+
+    /** Returns the client to access the host from inside the docker network. */
+    public AmazonKinesis getHostClient() {
+        return getClient(getHostEndpointUrl());
+    }
+
+    private AmazonKinesis getClient(String endPoint) {
+        return AmazonKinesisClientBuilder.standard()
+                .withCredentials(
+                        new AWSStaticCredentialsProvider(
+                                new BasicAWSCredentials(getAccessKey(), getSecretKey())))
+                .withEndpointConfiguration(
+                        new AwsClientBuilder.EndpointConfiguration(endPoint, "us-east-1"))
+                .build();
+    }
+
+    private Properties getProperties(String endpointUrl) {
+        Properties config = new Properties();
+        config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+        config.setProperty(AWSConfigConstants.AWS_ENDPOINT, endpointUrl);
+        config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, getAccessKey());
+        config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, getSecretKey());
+        return config;
+    }
+
+    private class ListStreamsWaitStrategy extends AbstractWaitStrategy {
+        @Override
+        protected void waitUntilReady() {
+            Unreliables.retryUntilSuccess(
+                    (int) this.startupTimeout.getSeconds(),
+                    TimeUnit.SECONDS,
+                    () -> this.getRateLimiter().getWhenReady(() -> tryList()));
+        }
+
+        private ListStreamsResult tryList() {
+            return getContainerClient().listStreams();
+        }
+    }
+}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
index c398c27..cd4af0e 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -125,6 +125,22 @@ under the License.
 					</artifactItems>
 				</configuration>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.22.1</version>
+				<configuration>
+					<systemPropertyVariables>
+						<!-- Required for Kinesalite. -->
+						<!-- Including shaded and non-shaded conf to support test running from Maven and IntelliJ -->
+						<com.amazonaws.sdk.disableCbor>true</com.amazonaws.sdk.disableCbor>
+						<com.amazonaws.sdk.disableCertChecking>true</com.amazonaws.sdk.disableCertChecking>
+						<org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor>true</org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor>
+						<org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking>true</org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
index bed4cf4..5256a50 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.kinesis.test;
 
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
-import org.apache.flink.streaming.kinesis.test.containers.KinesaliteContainer;
 import org.apache.flink.streaming.kinesis.test.model.Order;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.categories.TravisGroup1;
@@ -34,11 +34,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.Timeout;
 import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -49,62 +49,52 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
-import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /** End-to-end test for Kinesis Table API using Kinesalite. */
 @Category(value = {TravisGroup1.class})
 public class KinesisTableApiITCase extends TestLogger {
     private static final String ORDERS_STREAM = "orders";
     private static final String LARGE_ORDERS_STREAM = "large_orders";
+    private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
 
     private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar");
-    private final Network network = Network.newNetwork();
+    private static final Network network = Network.newNetwork();
 
     @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
 
-    @Rule
-    public final KinesaliteContainer kinesalite = new KinesaliteContainer().withNetwork(network);
+    @ClassRule
+    public static final KinesaliteContainer KINESALITE =
+            new KinesaliteContainer(
+                            DockerImageName.parse("instructure/kinesalite").withTag("latest"))
+                    .withNetwork(network)
+                    .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
 
     private KinesisPubsubClient kinesisClient;
 
-    @Rule
-    public final FlinkContainer flink =
+    @ClassRule
+    public static final FlinkContainer FLINK =
             FlinkContainer.builder()
                     .build()
-                    .withEnv("AWS_ACCESS_KEY_ID", "fakeid")
-                    .withEnv("AWS_SECRET_KEY", "fakekey")
+                    .withEnv("AWS_ACCESS_KEY_ID", KINESALITE.getAccessKey())
+                    .withEnv("AWS_SECRET_KEY", KINESALITE.getSecretKey())
                     .withEnv("AWS_CBOR_DISABLE", "1")
                     .withEnv(
                             "FLINK_ENV_JAVA_OPTS",
                             "-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking")
                     .withNetwork(network)
-                    .dependsOn(kinesalite);
+                    .dependsOn(KINESALITE);
 
     @Before
     public void setUp() throws Exception {
-        // Required for Kinesalite.
-        // Including shaded and non-shaded conf to support test running from Maven and IntelliJ
-        System.setProperty("com.amazonaws.sdk.disableCertChecking", "1");
-        System.setProperty("com.amazonaws.sdk.disableCbor", "1");
-        System.setProperty(
-                "org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking", "1");
-        System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "1");
-
-        Properties properties = new Properties();
-        properties.setProperty(AWS_ENDPOINT, kinesalite.getEndpointUrl());
-        properties.setProperty(AWS_ACCESS_KEY_ID, "ak");
-        properties.setProperty(AWS_SECRET_ACCESS_KEY, "sk");
+        Properties properties = KINESALITE.getContainerProperties();
 
         kinesisClient = new KinesisPubsubClient(properties);
         kinesisClient.createTopic(ORDERS_STREAM, 1, properties);
         kinesisClient.createTopic(LARGE_ORDERS_STREAM, 1, properties);
     }
 
-    @Test(timeout = 120_000)
+    @Test
     public void testTableApiSourceAndSink() throws Exception {
         List<Order> smallOrders = ImmutableList.of(new Order("A", 5), new Order("B", 10));
 
@@ -118,8 +108,7 @@ public class KinesisTableApiITCase extends TestLogger {
         executeSqlStatements(readSqlFile("filter-large-orders.sql"));
 
         List<Order> result = readAllOrdersFromKinesis(kinesisClient);
-        assertEquals(expected.size(), result.size());
-        assertTrue(result.containsAll(expected));
+        assertEquals(expected, result);
     }
 
     private List<Order> readAllOrdersFromKinesis(final KinesisPubsubClient client)
@@ -138,11 +127,11 @@ public class KinesisTableApiITCase extends TestLogger {
     }
 
     private List<String> readSqlFile(final String resourceName) throws Exception {
-        return Files.readAllLines(Paths.get(getClass().getResource(resourceName).toURI()));
+        return Files.readAllLines(Paths.get(getClass().getResource("/" + resourceName).toURI()));
     }
 
     private void executeSqlStatements(final List<String> sqlLines) throws Exception {
-        flink.submitSQLJob(
+        FLINK.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
                         .addJars(sqlConnectorKinesisJar)
                         .build());
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java
deleted file mode 100644
index 5162106..0000000
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/containers/KinesaliteContainer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.kinesis.test.containers;
-
-import org.testcontainers.containers.GenericContainer;
-
-/** A test Kinesis Data Streams container using Kinesalite. */
-public class KinesaliteContainer extends GenericContainer<KinesaliteContainer> {
-    public static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
-
-    private static final String IMAGE = "instructure/kinesalite:latest";
-    private static final int PORT = 4567;
-
-    public KinesaliteContainer() {
-        super(IMAGE);
-    }
-
-    @Override
-    protected void configure() {
-        withExposedPorts(PORT);
-        withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
-        withCreateContainerCmdModifier(
-                cmd ->
-                        cmd.withEntrypoint(
-                                "/tini",
-                                "--",
-                                "/usr/src/app/node_modules/kinesalite/cli.js",
-                                "--path",
-                                "/var/lib/kinesalite",
-                                "--ssl"));
-    }
-
-    public String getEndpointUrl() {
-        return "https://" + getHost() + ":" + getMappedPort(PORT);
-    }
-}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql
similarity index 100%
rename from flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/resources/org/apache/flink/streaming/kinesis/test/filter-large-orders.sql
rename to flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/filter-large-orders.sql
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..e463a0e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n