You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/29 14:52:46 UTC

[flink] branch release-1.14 updated (f851f58 -> 95777d1)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f851f58  [FLINK-25468] Copy SST files if they cannot be hard linked in RocksDBHandle.restoreInstanceDirectoryFromPath
     new e9b6abc  [FLINK-25147] add keyspace drop because the docker image is reused and modified by the tests.
     new 95777d1  [FLINK-25415] Add retries to CasandraConnectorITCase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/CassandraConnectorITCase.java        | 44 ++++++++++++++++++++--
 1 file changed, 41 insertions(+), 3 deletions(-)

[flink] 01/02: [FLINK-25147] add keyspace drop because the docker image is reused and modified by the tests.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9b6abc43a527be0b270ccd1fd03280384451b25
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 27 18:12:56 2021 +0100

    [FLINK-25147] add keyspace drop because the docker image is reused and modified by the tests.
---
 .../streaming/connectors/cassandra/CassandraConnectorITCase.java      | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 2d3ec33..78ce453 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -126,6 +126,8 @@ public class CassandraConnectorITCase
     private static final String TABLE_NAME_VARIABLE = "$TABLE";
     private static final String CREATE_KEYSPACE_QUERY =
             "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+    private static final String DROP_KEYSPACE_QUERY =
+            "DROP KEYSPACE IF EXISTS flink ;";
     private static final String CREATE_TABLE_QUERY =
             "CREATE TABLE flink."
                     + TABLE_NAME_VARIABLE
@@ -163,9 +165,11 @@ public class CassandraConnectorITCase
 
     @BeforeClass
     public static void startAndInitializeCassandra() {
+        // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
         session = cluster.connect();
+        session.execute(DROP_KEYSPACE_QUERY);
         session.execute(CREATE_KEYSPACE_QUERY);
         session.execute(
                 CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));

[flink] 02/02: [FLINK-25415] Add retries to CasandraConnectorITCase

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95777d1b7e4be231aaf6cec5a7b26c75206f8c50
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 27 18:13:41 2021 +0100

    [FLINK-25415] Add retries to CasandraConnectorITCase
    
    Add 3 retrials to all tests and to startAndInitializeCassandra() methods in CassandraConnectorITCase upon NoHostAvailableException which happens under load when cluster.connect() is called.
---
 .../cassandra/CassandraConnectorITCase.java        | 44 +++++++++++++++++++---
 1 file changed, 39 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 78ce453..50a137d 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -46,6 +46,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.DockerImageVersions;
 
@@ -54,14 +56,17 @@ import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.mapping.Mapper;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
 
 import java.net.InetSocketAddress;
@@ -83,7 +88,8 @@ import static org.junit.Assert.assertTrue;
 
 /** IT cases for all cassandra sinks. */
 @SuppressWarnings("serial")
-@Ignore(value = "Flaky test")
+// NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
+@RetryOnException(times = 3, exception = NoHostAvailableException.class)
 public class CassandraConnectorITCase
         extends WriteAheadSinkTestBase<
                 Tuple3<String, Integer, Integer>,
@@ -92,6 +98,12 @@ public class CassandraConnectorITCase
     @ClassRule
     public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
 
+    private static final int MAX_CONNECTION_RETRY = 3;
+    private static final long CONNECTION_RETRY_DELAY = 500L;
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+
+    @Rule public final RetryRule retryRule = new RetryRule();
+
     private static final int PORT = 9042;
 
     private static Cluster cluster;
@@ -126,8 +138,7 @@ public class CassandraConnectorITCase
     private static final String TABLE_NAME_VARIABLE = "$TABLE";
     private static final String CREATE_KEYSPACE_QUERY =
             "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-    private static final String DROP_KEYSPACE_QUERY =
-            "DROP KEYSPACE IF EXISTS flink ;";
+    private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS flink ;";
     private static final String CREATE_TABLE_QUERY =
             "CREATE TABLE flink."
                     + TABLE_NAME_VARIABLE
@@ -168,7 +179,30 @@ public class CassandraConnectorITCase
         // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
-        session = cluster.connect();
+        int retried = 0;
+        while (retried < MAX_CONNECTION_RETRY) {
+            try {
+                session = cluster.connect();
+                break;
+            } catch (NoHostAvailableException e) {
+                retried++;
+                LOG.debug(
+                        "Connection failed with NoHostAvailableException : retry number {}, will retry to connect within {} ms",
+                        retried,
+                        CONNECTION_RETRY_DELAY);
+                if (retried == MAX_CONNECTION_RETRY) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "Failed to connect to Cassandra cluster after %d retries every %d ms",
+                                    retried, CONNECTION_RETRY_DELAY),
+                            e);
+                }
+                try {
+                    Thread.sleep(CONNECTION_RETRY_DELAY);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }
         session.execute(DROP_KEYSPACE_QUERY);
         session.execute(CREATE_KEYSPACE_QUERY);
         session.execute(