You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/29 15:02:37 UTC

[flink] branch release-1.13 updated (eace77b -> ac08b44)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from eace77b  [FLINK-25468] Copy SST files if they cannot be hard linked in RocksDBHandle.restoreInstanceDirectoryFromPath
     new 8c7df6c  [FLINK-25147] add keyspace drop because the docker image is reused and modified by the tests.
     new ac08b44  [FLINK-25415] Add retries to CasandraConnectorITCase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/CassandraConnectorITCase.java        | 44 ++++++++++++++++++++--
 1 file changed, 41 insertions(+), 3 deletions(-)

[flink] 01/02: [FLINK-25147] add keyspace drop because the docker image is reused and modified by the tests.

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c7df6c9a60cd8aed98d670b06a8c606707e5faa
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 27 18:12:56 2021 +0100

    [FLINK-25147] add keyspace drop because the docker image is reused and modified by the tests.
---
 .../streaming/connectors/cassandra/CassandraConnectorITCase.java      | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f6fede9..abfbc96 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -126,6 +126,8 @@ public class CassandraConnectorITCase
     private static final String TABLE_NAME_VARIABLE = "$TABLE";
     private static final String CREATE_KEYSPACE_QUERY =
             "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+    private static final String DROP_KEYSPACE_QUERY =
+            "DROP KEYSPACE IF EXISTS flink ;";
     private static final String CREATE_TABLE_QUERY =
             "CREATE TABLE flink."
                     + TABLE_NAME_VARIABLE
@@ -163,9 +165,11 @@ public class CassandraConnectorITCase
 
     @BeforeClass
     public static void startAndInitializeCassandra() {
+        // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
         session = cluster.connect();
+        session.execute(DROP_KEYSPACE_QUERY);
         session.execute(CREATE_KEYSPACE_QUERY);
         session.execute(
                 CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));

[flink] 02/02: [FLINK-25415] Add retries to CasandraConnectorITCase

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac08b44f12f55e8b54258d2941dd1a6d0903b630
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 27 18:13:41 2021 +0100

    [FLINK-25415] Add retries to CasandraConnectorITCase
    
    Add 3 retrials to all tests and to startAndInitializeCassandra() methods in CassandraConnectorITCase upon NoHostAvailableException which happens under load when cluster.connect() is called.
---
 .../cassandra/CassandraConnectorITCase.java        | 44 +++++++++++++++++++---
 1 file changed, 39 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index abfbc96..3b4b314 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -46,6 +46,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.types.Row;
 
 import com.datastax.driver.core.Cluster;
@@ -53,14 +55,17 @@ import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.mapping.Mapper;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
 
 import java.net.InetSocketAddress;
@@ -82,7 +87,8 @@ import static org.junit.Assert.assertTrue;
 
 /** IT cases for all cassandra sinks. */
 @SuppressWarnings("serial")
-@Ignore(value = "Flaky test")
+// NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
+@RetryOnException(times = 3, exception = NoHostAvailableException.class)
 public class CassandraConnectorITCase
         extends WriteAheadSinkTestBase<
                 Tuple3<String, Integer, Integer>,
@@ -92,6 +98,12 @@ public class CassandraConnectorITCase
     public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
 
     private static final String CASSANDRA_3 = "cassandra:3.0";
+    private static final int MAX_CONNECTION_RETRY = 3;
+    private static final long CONNECTION_RETRY_DELAY = 500L;
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+
+    @Rule public final RetryRule retryRule = new RetryRule();
+
     private static final int PORT = 9042;
 
     private static Cluster cluster;
@@ -126,8 +138,7 @@ public class CassandraConnectorITCase
     private static final String TABLE_NAME_VARIABLE = "$TABLE";
     private static final String CREATE_KEYSPACE_QUERY =
             "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-    private static final String DROP_KEYSPACE_QUERY =
-            "DROP KEYSPACE IF EXISTS flink ;";
+    private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS flink ;";
     private static final String CREATE_TABLE_QUERY =
             "CREATE TABLE flink."
                     + TABLE_NAME_VARIABLE
@@ -168,7 +179,30 @@ public class CassandraConnectorITCase
         // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
-        session = cluster.connect();
+        int retried = 0;
+        while (retried < MAX_CONNECTION_RETRY) {
+            try {
+                session = cluster.connect();
+                break;
+            } catch (NoHostAvailableException e) {
+                retried++;
+                LOG.debug(
+                        "Connection failed with NoHostAvailableException : retry number {}, will retry to connect within {} ms",
+                        retried,
+                        CONNECTION_RETRY_DELAY);
+                if (retried == MAX_CONNECTION_RETRY) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "Failed to connect to Cassandra cluster after %d retries every %d ms",
+                                    retried, CONNECTION_RETRY_DELAY),
+                            e);
+                }
+                try {
+                    Thread.sleep(CONNECTION_RETRY_DELAY);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }
         session.execute(DROP_KEYSPACE_QUERY);
         session.execute(CREATE_KEYSPACE_QUERY);
         session.execute(