You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/01 12:42:02 UTC

[flink] branch release-1.13 updated (91e3acf -> 10a30f6)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 91e3acf  [FLINK-25818][Docs][Kafka] Add explanation how Kafka Source deals with idleness when parallelism is higher then the number of partitions
     new 58b70f9  [FLINK-25147][connectors][Cassandra][test] Use parent test containers version
     new 1288b75  [FLINK-25771][connectors][Cassandra][test] Raise all read/write/miscellaneous requests timeouts
     new 10a30f6  [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-connectors/flink-connector-cassandra/pom.xml |  2 -
 .../cassandra/CassandraConnectorITCase.java        | 58 ++++++++++++++++++++--
 2 files changed, 55 insertions(+), 5 deletions(-)

[flink] 02/03: [FLINK-25771][connectors][Cassandra][test] Raise all read/write/miscellaneous requests timeouts

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1288b75f67260e3470ecd752dd86fc75732bc32c
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 25 16:16:04 2022 +0100

    [FLINK-25771][connectors][Cassandra][test] Raise all read/write/miscellaneous requests timeouts
---
 .../cassandra/CassandraConnectorITCase.java        | 41 ++++++++++++++++++++--
 1 file changed, 38 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index e4d83f3..d25688f 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -65,11 +65,16 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -80,6 +85,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
@@ -96,16 +103,18 @@ public class CassandraConnectorITCase
                 Tuple3<String, Integer, Integer>,
                 CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
 
-    @ClassRule
-    public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
-
     private static final String CASSANDRA_3 = "cassandra:3.0";
     private static final int MAX_CONNECTION_RETRY = 3;
     private static final long CONNECTION_RETRY_DELAY = 500L;
     private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+    private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
     private static final String TABLE_POJO = "test";
     private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";
 
+    @ClassRule
+    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    @ClassRule
+    public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
     @Rule public final RetryRule retryRule = new RetryRule();
 
     private static final int PORT = 9042;
@@ -199,11 +208,13 @@ public class CassandraConnectorITCase
     public static CassandraContainer createCassandraContainer() {
         CassandraContainer cassandra = new CassandraContainer(CASSANDRA_3);
         cassandra.withJmxReporting(false);
+        cassandra.withLogConsumer(LOG_CONSUMER);
         return cassandra;
     }
 
     @BeforeClass
     public static void startAndInitializeCassandra() {
+        raiseCassandraRequestsTimeouts();
         // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
@@ -237,6 +248,30 @@ public class CassandraConnectorITCase
                 CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));
     }
 
+    private static void raiseCassandraRequestsTimeouts() {
+        try {
+            final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+            CASSANDRA_CONTAINER.copyFileFromContainer(
+                    "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
+            String configuration =
+                    new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
+            String patchedConfiguration =
+                    configuration
+                            .replaceAll("request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "read_request_timeout_in_ms: [0-9]+",
+                                    "read_request_timeout_in_ms: 15000")
+                            .replaceAll(
+                                    "write_request_timeout_in_ms: [0-9]+",
+                                    "write_request_timeout_in_ms: 6000");
+            Files.write(configurationPath, patchedConfiguration.getBytes(StandardCharsets.UTF_8));
+            CASSANDRA_CONTAINER.withConfigurationOverride(
+                    configurationPath.toAbsolutePath().toString());
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to open Cassandra configuration file ", e);
+        }
+    }
+
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);

[flink] 03/03: [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10a30f6d8585d8cfc7d06ed582bc462b9c07b029
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 27 15:05:08 2022 +0100

    [FLINK-25771][connectors][Cassandra][test] Add a test that raiseCassandraRequestsTimeouts indeed changes the configuration inside the container.
    
    (cherry picked from commit 3144fae0dc8f3ef4b2ed6a8da4cdff920bcc4128)
---
 .../cassandra/CassandraConnectorITCase.java        | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index d25688f..a950204 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -69,6 +69,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.Transferable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -85,8 +87,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
@@ -111,10 +111,11 @@ public class CassandraConnectorITCase
     private static final String TABLE_POJO = "test";
     private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";
 
-    @ClassRule
-    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
     @ClassRule
     public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
+
     @Rule public final RetryRule retryRule = new RetryRule();
 
     private static final int PORT = 9042;
@@ -257,21 +258,37 @@ public class CassandraConnectorITCase
                     new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
             String patchedConfiguration =
                     configuration
-                            .replaceAll("request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
                             .replaceAll(
                                     "read_request_timeout_in_ms: [0-9]+",
                                     "read_request_timeout_in_ms: 15000")
                             .replaceAll(
                                     "write_request_timeout_in_ms: [0-9]+",
                                     "write_request_timeout_in_ms: 6000");
-            Files.write(configurationPath, patchedConfiguration.getBytes(StandardCharsets.UTF_8));
-            CASSANDRA_CONTAINER.withConfigurationOverride(
-                    configurationPath.toAbsolutePath().toString());
+            CASSANDRA_CONTAINER.copyFileToContainer(
+                    Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
+                    "/etc/cassandra/cassandra.yaml");
         } catch (IOException e) {
             throw new RuntimeException("Unable to open Cassandra configuration file ", e);
         }
     }
 
+    @Test
+    public void testRaiseCassandraRequestsTimeouts() throws IOException {
+        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
+        // do not change the container conf twice, just assert that it was indeed changed in the
+        // container
+        final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+        CASSANDRA_CONTAINER.copyFileFromContainer(
+                "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
+        final String configuration =
+                new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
+        assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
+        assertTrue(configuration.contains("read_request_timeout_in_ms: 15000"));
+        assertTrue(configuration.contains("write_request_timeout_in_ms: 6000"));
+    }
+
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);

[flink] 01/03: [FLINK-25147][connectors][Cassandra][test] Use parent test containers version

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58b70f9bef9d3f5c7df98d5ccb67129745a8a9e6
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 25 14:39:36 2022 +0100

    [FLINK-25147][connectors][Cassandra][test] Use parent test containers version
    
    (cherry picked from commit c5483c684e668010a607b5b97618d22be66d1d55)
---
 flink-connectors/flink-connector-cassandra/pom.xml | 2 --
 1 file changed, 2 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index 94c5efa..df44ea2 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -39,7 +39,6 @@ under the License.
 	<properties>
 		<cassandra.version>2.2.5</cassandra.version>
 		<driver.version>3.0.0</driver.version>
-		<cassandra.testContainers.version>1.16.2</cassandra.testContainers.version>
 	</properties>
 
 	<build>
@@ -211,7 +210,6 @@ under the License.
 		<dependency>
 			<groupId>org.testcontainers</groupId>
 			<artifactId>cassandra</artifactId>
-			<version>${cassandra.testContainers.version}</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>