You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/01 12:41:18 UTC

[flink] 03/03: [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 498ee85f970cb25fb905374cb6c577e9984adc30
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 27 15:05:08 2022 +0100

    [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.
    
    (cherry picked from commit 3144fae0dc8f3ef4b2ed6a8da4cdff920bcc4128)
---
 .../cassandra/CassandraConnectorITCase.java        | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index c1681d5..4c24942 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -70,6 +70,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.Transferable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -86,8 +88,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
@@ -111,10 +111,11 @@ public class CassandraConnectorITCase
     private static final String TABLE_POJO = "test";
     private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";
 
-    @ClassRule
-    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
     @ClassRule
     public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
+
     @Rule public final RetryRule retryRule = new RetryRule();
 
     private static final int PORT = 9042;
@@ -257,21 +258,37 @@ public class CassandraConnectorITCase
                     new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
             String patchedConfiguration =
                     configuration
-                            .replaceAll("request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
                             .replaceAll(
                                     "read_request_timeout_in_ms: [0-9]+",
                                     "read_request_timeout_in_ms: 15000")
                             .replaceAll(
                                     "write_request_timeout_in_ms: [0-9]+",
                                     "write_request_timeout_in_ms: 6000");
-            Files.write(configurationPath, patchedConfiguration.getBytes(StandardCharsets.UTF_8));
-            CASSANDRA_CONTAINER.withConfigurationOverride(
-                    configurationPath.toAbsolutePath().toString());
+            CASSANDRA_CONTAINER.copyFileToContainer(
+                    Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
+                    "/etc/cassandra/cassandra.yaml");
         } catch (IOException e) {
             throw new RuntimeException("Unable to open Cassandra configuration file ", e);
         }
     }
 
+    @Test
+    public void testRaiseCassandraRequestsTimeouts() throws IOException {
+        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
+        // do not change the container conf twice, just assert that it was indeed changed in the
+        // container
+        final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+        CASSANDRA_CONTAINER.copyFileFromContainer(
+                "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
+        final String configuration =
+                new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
+        assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
+        assertTrue(configuration.contains("read_request_timeout_in_ms: 15000"));
+        assertTrue(configuration.contains("write_request_timeout_in_ms: 6000"));
+    }
+
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);