You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zentol (via GitHub)" <gi...@apache.org> on 2023/03/14 11:16:38 UTC

[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

zentol commented on code in PR #3:
URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1135383543


##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.rmi.server.RMISocketFactory;
+import java.util.Map;
+
+/**
+ * Junit test environment that contains everything needed at the test suite level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+    private static final String STORAGE_SERVICE_MBEAN =
+            "org.apache.cassandra.db:type=StorageService";
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay.
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+    private final String dockerHostIp;
+    private final int jmxPort;
+
+    private Cluster cluster;
+    private Session session;
+    private ClusterBuilder clusterBuilder;
+
+    public CassandraTestEnvironment() {
+        try (ServerSocket s = new ServerSocket(0)) {
+            // use fixed free random port for JMX
+            jmxPort = s.getLocalPort();
+            // resolve IP of docker host to establish JMX connection
+            dockerHostIp = InetAddress.getLocalHost().getHostAddress();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
+        // use a fixed port mapping for JMX, it doesn't work well with mapped ports
+        cassandraContainer.setPortBindings(ImmutableList.of(jmxPort + ":" + jmxPort));
+        // more generous timeouts and remote JMX configuration
+        addJavaOpts(
+                cassandraContainer,
+                "-Dcassandra.request_timeout_in_ms=30000",
+                "-Dcassandra.read_request_timeout_in_ms=15000",
+                "-Dcassandra.write_request_timeout_in_ms=6000",
+                "-Dcassandra.jmx.remote.port=" + jmxPort,
+                "-Dcom.sun.management.jmxremote.rmi.port=" + jmxPort,
+                "-Djava.rmi.server.hostname=" + dockerHostIp,
+                "-Dcom.sun.management.jmxremote.host=" + dockerHostIp);
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        startEnv();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        stopEnv();
+    }
+
+    private static void addJavaOpts(GenericContainer<?> container, String... opts) {
+        String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", "");
+        container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " "));
+    }
+
+    private void startEnv() throws Exception {
+        // configure container start to wait until cassandra is ready to receive queries
+        cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
+        // start with retrials
+        cassandraContainer.start();
+        cassandraContainer.followOutput(
+                new Slf4jLogConsumer(LOG),
+                OutputFrame.OutputType.END,
+                OutputFrame.OutputType.STDERR,
+                OutputFrame.OutputType.STDOUT);
+
+        cluster = cassandraContainer.getCluster();
+        clusterBuilder =
+                createBuilderWithConsistencyLevel(
+                        ConsistencyLevel.ONE,
+                        cassandraContainer.getHost(),
+                        cassandraContainer.getMappedPort(CQL_PORT));
+
+        session = cluster.connect();
+        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+        // create a dedicated table for split size tests (to avoid having to flush with each test)
+        insertTestDataForSplitSizeTests();
+    }
+
+    private void insertTestDataForSplitSizeTests() throws Exception {
+        session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY));
+        for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
+            session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, i)));
+        }
+        flushMemTables(SPLITS_TABLE);
+    }
+
+    private void stopEnv() {
+
+        if (session != null) {
+            session.close();
+        }
+        if (cluster != null) {
+            cluster.close();
+        }
+        cassandraContainer.stop();
+    }
+
+    private ClusterBuilder createBuilderWithConsistencyLevel(
+            ConsistencyLevel consistencyLevel, String host, int port) {
+        return new ClusterBuilder() {
+            @Override
+            protected Cluster buildCluster(Cluster.Builder builder) {
+                return builder.addContactPointsWithPorts(new InetSocketAddress(host, port))
+                        .withQueryOptions(
+                                new QueryOptions()
+                                        .setConsistencyLevel(consistencyLevel)
+                                        .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+                        .withSocketOptions(
+                                new SocketOptions()
+                                        // default timeout x 3
+                                        .setConnectTimeoutMillis(15000)
+                                        // default timeout x3 and higher than
+                                        // request_timeout_in_ms at the cluster level
+                                        .setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
+                        .withoutJMXReporting()
+                        .withoutMetrics()
+                        .build();
+            }
+        };
+    }
+
+    /**
+     * Force the flush of cassandra memTables to SSTables in order to update size_estimates. This
+     * flush method is what official Cassandra NoteTool does. It is needed for the tests because we
+     * just inserted records, we need to force cassandra to update size_estimates system table.
+     */

Review Comment:
   ```suggestion
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.rmi.server.RMISocketFactory;
+import java.util.Map;
+
+/**
+ * Junit test environment that contains everything needed at the test suite level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+    private static final String STORAGE_SERVICE_MBEAN =
+            "org.apache.cassandra.db:type=StorageService";
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay.
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+    private final String dockerHostIp;
+    private final int jmxPort;
+
+    private Cluster cluster;
+    private Session session;
+    private ClusterBuilder clusterBuilder;
+
+    public CassandraTestEnvironment() {
+        try (ServerSocket s = new ServerSocket(0)) {
+            // use fixed free random port for JMX
+            jmxPort = s.getLocalPort();
+            // resolve IP of docker host to establish JMX connection
+            dockerHostIp = InetAddress.getLocalHost().getHostAddress();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+

Review Comment:
   ```suggestion
   ```
   This wasn't safe anyway.



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.rmi.server.RMISocketFactory;
+import java.util.Map;
+
+/**
+ * Junit test environment that contains everything needed at the test suite level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+    private static final String STORAGE_SERVICE_MBEAN =
+            "org.apache.cassandra.db:type=StorageService";

Review Comment:
   ```suggestion
   ```



##########
flink-connector-cassandra/pom.xml:
##########
@@ -180,7 +188,17 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
-		<!-- Test dependencies -->
+	<!-- Test dependencies -->
+
+		<!-- for JMX MBean invocation to flush mem tables.
+		Alternative is cassandra-all artifact which is huge.
+		palantir-cassandra-jmx-api is ASF v2 -->
+		<dependency>
+			<groupId>com.palantir.cassandra</groupId>
+			<artifactId>palantir-cassandra-jmx-api</artifactId>
+			<version>${palantir-cassandra-jmx-api.version}</version>
+			<scope>test</scope>
+		</dependency>

Review Comment:
   ```suggestion
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.rmi.server.RMISocketFactory;
+import java.util.Map;
+
+/**
+ * Junit test environment that contains everything needed at the test suite level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+    private static final String STORAGE_SERVICE_MBEAN =
+            "org.apache.cassandra.db:type=StorageService";
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay.
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+    private final String dockerHostIp;
+    private final int jmxPort;
+
+    private Cluster cluster;
+    private Session session;
+    private ClusterBuilder clusterBuilder;
+
+    public CassandraTestEnvironment() {
+        try (ServerSocket s = new ServerSocket(0)) {
+            // use fixed free random port for JMX
+            jmxPort = s.getLocalPort();
+            // resolve IP of docker host to establish JMX connection
+            dockerHostIp = InetAddress.getLocalHost().getHostAddress();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
+        // use a fixed port mapping for JMX, it doesn't work well with mapped ports
+        cassandraContainer.setPortBindings(ImmutableList.of(jmxPort + ":" + jmxPort));
+        // more generous timeouts and remote JMX configuration
+        addJavaOpts(
+                cassandraContainer,
+                "-Dcassandra.request_timeout_in_ms=30000",
+                "-Dcassandra.read_request_timeout_in_ms=15000",
+                "-Dcassandra.write_request_timeout_in_ms=6000",
+                "-Dcassandra.jmx.remote.port=" + jmxPort,
+                "-Dcom.sun.management.jmxremote.rmi.port=" + jmxPort,
+                "-Djava.rmi.server.hostname=" + dockerHostIp,
+                "-Dcom.sun.management.jmxremote.host=" + dockerHostIp);
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        startEnv();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        stopEnv();
+    }
+
+    private static void addJavaOpts(GenericContainer<?> container, String... opts) {
+        String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", "");
+        container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " "));
+    }
+
+    private void startEnv() throws Exception {
+        // configure container start to wait until cassandra is ready to receive queries
+        cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
+        // start with retrials
+        cassandraContainer.start();
+        cassandraContainer.followOutput(
+                new Slf4jLogConsumer(LOG),
+                OutputFrame.OutputType.END,
+                OutputFrame.OutputType.STDERR,
+                OutputFrame.OutputType.STDOUT);
+
+        cluster = cassandraContainer.getCluster();
+        clusterBuilder =
+                createBuilderWithConsistencyLevel(
+                        ConsistencyLevel.ONE,
+                        cassandraContainer.getHost(),
+                        cassandraContainer.getMappedPort(CQL_PORT));
+
+        session = cluster.connect();
+        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+        // create a dedicated table for split size tests (to avoid having to flush with each test)
+        insertTestDataForSplitSizeTests();
+    }
+
+    private void insertTestDataForSplitSizeTests() throws Exception {
+        session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY));
+        for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
+            session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, i)));
+        }
+        flushMemTables(SPLITS_TABLE);
+    }
+
+    private void stopEnv() {
+
+        if (session != null) {
+            session.close();
+        }
+        if (cluster != null) {
+            cluster.close();
+        }
+        cassandraContainer.stop();
+    }
+
+    private ClusterBuilder createBuilderWithConsistencyLevel(
+            ConsistencyLevel consistencyLevel, String host, int port) {
+        return new ClusterBuilder() {
+            @Override
+            protected Cluster buildCluster(Cluster.Builder builder) {
+                return builder.addContactPointsWithPorts(new InetSocketAddress(host, port))
+                        .withQueryOptions(
+                                new QueryOptions()
+                                        .setConsistencyLevel(consistencyLevel)
+                                        .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+                        .withSocketOptions(
+                                new SocketOptions()
+                                        // default timeout x 3
+                                        .setConnectTimeoutMillis(15000)
+                                        // default timeout x3 and higher than
+                                        // request_timeout_in_ms at the cluster level
+                                        .setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
+                        .withoutJMXReporting()
+                        .withoutMetrics()
+                        .build();
+            }
+        };
+    }
+
+    /**
+     * Force the flush of cassandra memTables to SSTables in order to update size_estimates. This
+     * flush method is what official Cassandra NoteTool does. It is needed for the tests because we
+     * just inserted records, we need to force cassandra to update size_estimates system table.
+     */
+    void flushMemTables(String table) throws Exception {
+        JMXServiceURL url = new JMXServiceURL(String.format(JMX_URL, dockerHostIp, jmxPort));
+        Map<String, Object> env =
+                ImmutableMap.of(
+                        "com.sun.jndi.rmi.factory.socket",
+                        RMISocketFactory.getDefaultSocketFactory()); // connection without ssl
+        try (JMXConnector jmxConnector = JMXConnectorFactory.connect(url, env)) {
+            MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
+            ObjectName objectName = new ObjectName(STORAGE_SERVICE_MBEAN);
+            StorageServiceMBean mBeanProxy =
+                    JMX.newMBeanProxy(mBeanServerConnection, objectName, StorageServiceMBean.class);
+            mBeanProxy.forceKeyspaceFlush(KEYSPACE, table);
+        }

Review Comment:
   
   ```suggestion
           cassandraContainer.execInContainer("nodetool",
                   "flush",
                   KEYSPACE,
                   table);
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.rmi.server.RMISocketFactory;
+import java.util.Map;
+
+/**
+ * Junit test environment that contains everything needed at the test suite level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+    private static final String STORAGE_SERVICE_MBEAN =
+            "org.apache.cassandra.db:type=StorageService";
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay.
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+    private final String dockerHostIp;
+    private final int jmxPort;
+
+    private Cluster cluster;
+    private Session session;
+    private ClusterBuilder clusterBuilder;
+
+    public CassandraTestEnvironment() {
+        try (ServerSocket s = new ServerSocket(0)) {
+            // use fixed free random port for JMX
+            jmxPort = s.getLocalPort();
+            // resolve IP of docker host to establish JMX connection
+            dockerHostIp = InetAddress.getLocalHost().getHostAddress();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
+        // use a fixed port mapping for JMX, it doesn't work well with mapped ports
+        cassandraContainer.setPortBindings(ImmutableList.of(jmxPort + ":" + jmxPort));
+        // more generous timeouts and remote JMX configuration
+        addJavaOpts(
+                cassandraContainer,
+                "-Dcassandra.request_timeout_in_ms=30000",
+                "-Dcassandra.read_request_timeout_in_ms=15000",
+                "-Dcassandra.write_request_timeout_in_ms=6000",
+                "-Dcassandra.jmx.remote.port=" + jmxPort,
+                "-Dcom.sun.management.jmxremote.rmi.port=" + jmxPort,
+                "-Djava.rmi.server.hostname=" + dockerHostIp,
+                "-Dcom.sun.management.jmxremote.host=" + dockerHostIp);

Review Comment:
   ```suggestion
                   "-Dcassandra.write_request_timeout_in_ms=6000");
   ```



##########
flink-connector-cassandra/pom.xml:
##########
@@ -53,6 +53,7 @@ under the License.
 		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
 		<driver.version>3.11.2</driver.version>
 		<guava.version>19.0</guava.version>
+		<palantir-cassandra-jmx-api.version>2.2.18-1.108.0</palantir-cassandra-jmx-api.version>

Review Comment:
   ```suggestion
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.rmi.server.RMISocketFactory;
+import java.util.Map;
+
+/**
+ * Junit test environment that contains everything needed at the test suite level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+    private static final String STORAGE_SERVICE_MBEAN =
+            "org.apache.cassandra.db:type=StorageService";
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay.
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+    private final String dockerHostIp;
+    private final int jmxPort;

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org