You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/29 14:52:48 UTC

[flink] 02/02: [FLINK-25415] Add retries to CasandraConnectorITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95777d1b7e4be231aaf6cec5a7b26c75206f8c50
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 27 18:13:41 2021 +0100

    [FLINK-25415] Add retries to CasandraConnectorITCase
    
    Add 3 retrials to all tests and to startAndInitializeCassandra() methods in CassandraConnectorITCase upon NoHostAvailableException which happens under load when cluster.connect() is called.
---
 .../cassandra/CassandraConnectorITCase.java        | 44 +++++++++++++++++++---
 1 file changed, 39 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 78ce453..50a137d 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -46,6 +46,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.DockerImageVersions;
 
@@ -54,14 +56,17 @@ import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.mapping.Mapper;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
 
 import java.net.InetSocketAddress;
@@ -83,7 +88,8 @@ import static org.junit.Assert.assertTrue;
 
 /** IT cases for all cassandra sinks. */
 @SuppressWarnings("serial")
-@Ignore(value = "Flaky test")
+// NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
+@RetryOnException(times = 3, exception = NoHostAvailableException.class)
 public class CassandraConnectorITCase
         extends WriteAheadSinkTestBase<
                 Tuple3<String, Integer, Integer>,
@@ -92,6 +98,12 @@ public class CassandraConnectorITCase
     @ClassRule
     public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
 
+    private static final int MAX_CONNECTION_RETRY = 3;
+    private static final long CONNECTION_RETRY_DELAY = 500L;
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+
+    @Rule public final RetryRule retryRule = new RetryRule();
+
     private static final int PORT = 9042;
 
     private static Cluster cluster;
@@ -126,8 +138,7 @@ public class CassandraConnectorITCase
     private static final String TABLE_NAME_VARIABLE = "$TABLE";
     private static final String CREATE_KEYSPACE_QUERY =
             "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-    private static final String DROP_KEYSPACE_QUERY =
-            "DROP KEYSPACE IF EXISTS flink ;";
+    private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS flink ;";
     private static final String CREATE_TABLE_QUERY =
             "CREATE TABLE flink."
                     + TABLE_NAME_VARIABLE
@@ -168,7 +179,30 @@ public class CassandraConnectorITCase
         // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
-        session = cluster.connect();
+        int retried = 0;
+        while (retried < MAX_CONNECTION_RETRY) {
+            try {
+                session = cluster.connect();
+                break;
+            } catch (NoHostAvailableException e) {
+                retried++;
+                LOG.debug(
+                        "Connection failed with NoHostAvailableException : retry number {}, will retry to connect within {} ms",
+                        retried,
+                        CONNECTION_RETRY_DELAY);
+                if (retried == MAX_CONNECTION_RETRY) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "Failed to connect to Cassandra cluster after %d retries every %d ms",
+                                    retried, CONNECTION_RETRY_DELAY),
+                            e);
+                }
+                try {
+                    Thread.sleep(CONNECTION_RETRY_DELAY);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }
         session.execute(DROP_KEYSPACE_QUERY);
         session.execute(CREATE_KEYSPACE_QUERY);
         session.execute(