You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/29 14:52:48 UTC
[flink] 02/02: [FLINK-25415] Add retries to CasandraConnectorITCase
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 95777d1b7e4be231aaf6cec5a7b26c75206f8c50
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 27 18:13:41 2021 +0100
[FLINK-25415] Add retries to CasandraConnectorITCase
Add 3 retrials to all tests and to startAndInitializeCassandra() methods in CassandraConnectorITCase upon NoHostAvailableException which happens under load when cluster.connect() is called.
---
.../cassandra/CassandraConnectorITCase.java | 44 +++++++++++++++++++---
1 file changed, 39 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 78ce453..50a137d 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -46,6 +46,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.types.Row;
import org.apache.flink.util.DockerImageVersions;
@@ -54,14 +56,17 @@ import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.CassandraContainer;
import java.net.InetSocketAddress;
@@ -83,7 +88,8 @@ import static org.junit.Assert.assertTrue;
/** IT cases for all cassandra sinks. */
@SuppressWarnings("serial")
-@Ignore(value = "Flaky test")
+// NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
+@RetryOnException(times = 3, exception = NoHostAvailableException.class)
public class CassandraConnectorITCase
extends WriteAheadSinkTestBase<
Tuple3<String, Integer, Integer>,
@@ -92,6 +98,12 @@ public class CassandraConnectorITCase
@ClassRule
public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
+ private static final int MAX_CONNECTION_RETRY = 3;
+ private static final long CONNECTION_RETRY_DELAY = 500L;
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+
+ @Rule public final RetryRule retryRule = new RetryRule();
+
private static final int PORT = 9042;
private static Cluster cluster;
@@ -126,8 +138,7 @@ public class CassandraConnectorITCase
private static final String TABLE_NAME_VARIABLE = "$TABLE";
private static final String CREATE_KEYSPACE_QUERY =
"CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
- private static final String DROP_KEYSPACE_QUERY =
- "DROP KEYSPACE IF EXISTS flink ;";
+ private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS flink ;";
private static final String CREATE_TABLE_QUERY =
"CREATE TABLE flink."
+ TABLE_NAME_VARIABLE
@@ -168,7 +179,30 @@ public class CassandraConnectorITCase
// CASSANDRA_CONTAINER#start() already contains retrials
CASSANDRA_CONTAINER.start();
cluster = CASSANDRA_CONTAINER.getCluster();
- session = cluster.connect();
+ int retried = 0;
+ while (retried < MAX_CONNECTION_RETRY) {
+ try {
+ session = cluster.connect();
+ break;
+ } catch (NoHostAvailableException e) {
+ retried++;
+ LOG.debug(
+ "Connection failed with NoHostAvailableException : retry number {}, will retry to connect within {} ms",
+ retried,
+ CONNECTION_RETRY_DELAY);
+ if (retried == MAX_CONNECTION_RETRY) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to connect to Cassandra cluster after %d retries every %d ms",
+ retried, CONNECTION_RETRY_DELAY),
+ e);
+ }
+ try {
+ Thread.sleep(CONNECTION_RETRY_DELAY);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
session.execute(DROP_KEYSPACE_QUERY);
session.execute(CREATE_KEYSPACE_QUERY);
session.execute(