You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/01 12:41:15 UTC

[flink] branch release-1.14 updated (e074805 -> 498ee85)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e074805  [FLINK-25818][Docs][Kafka] Add explanation how Kafka Source deals with idleness when parallelism is higher then the number of partitions
     new 0a294e3  [FLINK-25147][connectors][Cassandra][test] Use parent test containers version
     new b7a0e02  [FLINK-25771][connectors][Cassandra][test] Raise all read/write/miscellaneous requests timeouts
     new 498ee85  [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-connectors/flink-connector-cassandra/pom.xml |  2 -
 .../cassandra/CassandraConnectorITCase.java        | 58 ++++++++++++++++++++--
 2 files changed, 55 insertions(+), 5 deletions(-)

[flink] 02/03: [FLINK-25771][connectors][Cassandra][test] Raise all read/write/miscellaneous requests timeouts

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7a0e02ed84c26bb876465f93ed2d3649eeb4b86
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 25 16:16:04 2022 +0100

    [FLINK-25771][connectors][Cassandra][test] Raise all read/write/miscellaneous requests timeouts
    
    (cherry picked from commit 9d44bc0e973cca6d03ed222158b7b5ce70306ab0)
---
 .../cassandra/CassandraConnectorITCase.java        | 41 ++++++++++++++++++++--
 1 file changed, 38 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index cd85989..c1681d5 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -66,11 +66,16 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -81,6 +86,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
@@ -97,15 +104,17 @@ public class CassandraConnectorITCase
                 Tuple3<String, Integer, Integer>,
                 CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
 
-    @ClassRule
-    public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
-
     private static final int MAX_CONNECTION_RETRY = 3;
     private static final long CONNECTION_RETRY_DELAY = 500L;
     private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+    private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
     private static final String TABLE_POJO = "test";
     private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";
 
+    @ClassRule
+    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    @ClassRule
+    public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
     @Rule public final RetryRule retryRule = new RetryRule();
 
     private static final int PORT = 9042;
@@ -199,11 +208,13 @@ public class CassandraConnectorITCase
     public static CassandraContainer createCassandraContainer() {
         CassandraContainer cassandra = new CassandraContainer(DockerImageVersions.CASSANDRA_3);
         cassandra.withJmxReporting(false);
+        cassandra.withLogConsumer(LOG_CONSUMER);
         return cassandra;
     }
 
     @BeforeClass
     public static void startAndInitializeCassandra() {
+        raiseCassandraRequestsTimeouts();
         // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
@@ -237,6 +248,30 @@ public class CassandraConnectorITCase
                 CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));
     }
 
+    private static void raiseCassandraRequestsTimeouts() {
+        try {
+            final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+            CASSANDRA_CONTAINER.copyFileFromContainer(
+                    "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
+            String configuration =
+                    new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
+            String patchedConfiguration =
+                    configuration
+                            .replaceAll("request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "read_request_timeout_in_ms: [0-9]+",
+                                    "read_request_timeout_in_ms: 15000")
+                            .replaceAll(
+                                    "write_request_timeout_in_ms: [0-9]+",
+                                    "write_request_timeout_in_ms: 6000");
+            Files.write(configurationPath, patchedConfiguration.getBytes(StandardCharsets.UTF_8));
+            CASSANDRA_CONTAINER.withConfigurationOverride(
+                    configurationPath.toAbsolutePath().toString());
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to open Cassandra configuration file ", e);
+        }
+    }
+
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);

[flink] 03/03: [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 498ee85f970cb25fb905374cb6c577e9984adc30
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 27 15:05:08 2022 +0100

    [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.
    
    (cherry picked from commit 3144fae0dc8f3ef4b2ed6a8da4cdff920bcc4128)
---
 .../cassandra/CassandraConnectorITCase.java        | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index c1681d5..4c24942 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -70,6 +70,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.Transferable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -86,8 +88,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
@@ -111,10 +111,11 @@ public class CassandraConnectorITCase
     private static final String TABLE_POJO = "test";
     private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";
 
-    @ClassRule
-    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
     @ClassRule
     public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
+
     @Rule public final RetryRule retryRule = new RetryRule();
 
     private static final int PORT = 9042;
@@ -257,21 +258,37 @@ public class CassandraConnectorITCase
                     new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
             String patchedConfiguration =
                     configuration
-                            .replaceAll("request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
                             .replaceAll(
                                     "read_request_timeout_in_ms: [0-9]+",
                                     "read_request_timeout_in_ms: 15000")
                             .replaceAll(
                                     "write_request_timeout_in_ms: [0-9]+",
                                     "write_request_timeout_in_ms: 6000");
-            Files.write(configurationPath, patchedConfiguration.getBytes(StandardCharsets.UTF_8));
-            CASSANDRA_CONTAINER.withConfigurationOverride(
-                    configurationPath.toAbsolutePath().toString());
+            CASSANDRA_CONTAINER.copyFileToContainer(
+                    Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
+                    "/etc/cassandra/cassandra.yaml");
         } catch (IOException e) {
             throw new RuntimeException("Unable to open Cassandra configuration file ", e);
         }
     }
 
+    @Test
+    public void testRaiseCassandraRequestsTimeouts() throws IOException {
+        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
+        // do not change the container conf twice, just assert that it was indeed changed in the
+        // container
+        final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+        CASSANDRA_CONTAINER.copyFileFromContainer(
+                "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
+        final String configuration =
+                new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
+        assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
+        assertTrue(configuration.contains("read_request_timeout_in_ms: 15000"));
+        assertTrue(configuration.contains("write_request_timeout_in_ms: 6000"));
+    }
+
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);

[flink] 01/03: [FLINK-25147][connectors][Cassandra][test] Use parent test containers version

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a294e3f7a0129719170222cdff00f997908648f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 25 14:39:36 2022 +0100

    [FLINK-25147][connectors][Cassandra][test] Use parent test containers version
    
    (cherry picked from commit c5483c684e668010a607b5b97618d22be66d1d55)
---
 flink-connectors/flink-connector-cassandra/pom.xml | 2 --
 1 file changed, 2 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index c637c31..4b8319a 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -39,7 +39,6 @@ under the License.
 	<properties>
 		<cassandra.version>2.2.5</cassandra.version>
 		<driver.version>3.0.0</driver.version>
-		<cassandra.testContainers.version>1.16.2</cassandra.testContainers.version>
 	</properties>
 
 	<build>
@@ -211,7 +210,6 @@ under the License.
 		<dependency>
 			<groupId>org.testcontainers</groupId>
 			<artifactId>cassandra</artifactId>
-			<version>${cassandra.testContainers.version}</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>