You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/26 14:53:59 UTC

[GitHub] [flink] echauchot opened a new pull request, #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

echauchot opened a new pull request, #19586:
URL: https://github.com/apache/flink/pull/19586

   ## What is the purpose of the change
   
   Make current Flink Cassandra connector support all the latest Cassandra versions supported by Apache Cassandra (3.026, 3.111.12, 4.0.3 until now)
   
   ## Brief change log
   
   **Versions**: I was able to address all Cassandra versions with cassandra 4.x lib and driver 3.x without prod code change. There was a big refactor of the driver in 4.x that totally changes the driver API so I preferred sticking to latest driver 3.x that allows to call Cassandra 4.x
   
   ** Bug uncovering ** Migrating to Cassandra 4.x uncovered a race condition in the tests between the asynchronous writes and the junit assertions. So I introduced a `sink#setSynchronousWrites()` method defaulting to false for backward compatibility. And I put all the sinks in the tests to write synchronously. That way we are sure that, in the tests, writes are finished before asserting on their result (no more race condition).
   
   **Tests**: I first tried to use parameterized tests but the parameters are injected after the testContainers ClassRule that creates the container is evaluated. As this ClassRule is mandatory for Cassandra testConainer, I could not use parameterized tests. So I switched to a hierarchy of classes with an ITCase per Cassandra version : the subClasses do only the container management and the actual tests remain in CassandraConnectorITCase which was renamed (and put abstract) because it is no more the IT entry point.
   
   
   **Commits history** I think the history is clean: I isolated support of all Cassandra versions in a single commit. I also isolated synchronous write support of every sink for easy revert. The final commit is to fix the race condition in all the tests.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows: Cassandra30ConnectorITCase (Cassandra 3.0.26), Cassandra311ConnectorITCase (Cassandra 3.11.12), Cassandra40ConnectorITCase (Cassandra 4.0.3)
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes upgrade
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: (yes / no / don't know): no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes ability for sync writes with timeout
     - If yes, how is the feature documented? javadoc
   
   R: @MartijnVisser 
   CC: @zentol 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r861897602


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   I created [this ticket](https://issues.apache.org/jira/browse/FLINK-27457) to track the flush task in the output formats 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1121062218

   @zentol I dropped the flush commits. So, race condition is back. Waiting for the other PR merge to rebase this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1156526744

   Done PTAL
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r895688621


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>
+		<guava.version>19.0</guava.version>

Review Comment:
   :+1: good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r895746196


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -140,11 +142,11 @@ under the License.
 			<artifactId>scala-library</artifactId>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
+<!--		<dependency>

Review Comment:
   No, actually it is a test leftover. I wanted to know the guava version of the new driver transitive dependency. I forgot to enable the dep back. I prefer having the explicit dep as the code refers it rather than relying on the transitive dep. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r896961549


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r895748559


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java:
##########
@@ -106,6 +106,8 @@
 
     private static final int MAX_CONNECTION_RETRY = 3;
     private static final long CONNECTION_RETRY_DELAY = 500L;
+    private static final int WRITE_TIMEOUT = 10;

Review Comment:
   conflict resolution bug during rebase, thanks for catching it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r896540249


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -140,11 +142,11 @@ under the License.
 			<artifactId>scala-library</artifactId>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
+<!--		<dependency>

Review Comment:
   ok 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r896832924


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   @zentol a question related to netty: I went through [FLINK-8295](https://issues.apache.org/jira/browse/FLINK-8295) and in this ticket we relocate netty to `com.datastax.shaded.netty` so that cassandra driver could find it. But it would have found it also with default `io.netty` namespace so why can't we just keep netty transitive dep as it is ? Is it conflicting with netty elsewhere in flink ? (I checked the version used in cassandra is the same as in flink parent) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r859930212


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorBaseTest.java:
##########
@@ -220,7 +225,7 @@ private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, Stri
     }
 
     @NotNull
-    private static Table createTableAnnotation(String keyspace, String tableName) {
+    private Table createTableAnnotation(String keyspace, String tableName) {

Review Comment:
   Because it is called only from `annotatePojoWithTable` method and this method was called only from the test methods so it should not have been static in the first place. So I'm cleaning this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1156372897

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r893627065


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java:
##########
@@ -106,6 +106,8 @@
 
     private static final int MAX_CONNECTION_RETRY = 3;
     private static final long CONNECTION_RETRY_DELAY = 500L;
+    private static final int WRITE_TIMEOUT = 10;

Review Comment:
   unused



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   The dependency tree got quite a bit larger.
   
   There's some native code business (jnr-ffi/jnr-posix) which relies on asm, and an added jackson dependency.
   We could think about excluding `jnr-ffi`; afaict it is optional and uses who want to use it can do so.
   
   As for jackson, we should consider bundling&relocating it. But first we should figure out what that is used for anyway; specifically whether it is a purely internal dependency or also exposed somewhere in the API.



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>
+		<guava.version>19.0</guava.version>

Review Comment:
   unused



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -140,11 +142,11 @@ under the License.
 			<artifactId>scala-library</artifactId>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
+<!--		<dependency>

Review Comment:
   remove it instead



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   FYI cassandra also publishes a shaded jar of cassandra-core which already bundles netty; could use that to reduce some complexity on our side.



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>

Review Comment:
   unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860673272


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Cassandra40ConnectorITCase.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.testcontainers.containers.CassandraContainer;
+
+/** Class for IT cases for all Cassandra sinks tested on Cassandra 4.0.x . */
+public class Cassandra40ConnectorITCase extends CassandraConnectorBaseTest {

Review Comment:
   One could argue that since the driver says that it supports it, it should be good enough to test a single version.
   We also don't test our Kafka connector against 10 different kafka versions; we rely on the compatibility guarantees provided by the client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r859596704


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -37,8 +37,10 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.1</driver.version>
 		<guava.version>18.0</guava.version>

Review Comment:
   Actually I don't think this explicit dependency was needed in the first place. Indeed cassandra driver 3.0.0 had `com.google.guava:guava:jar:16.0.1:compile` as a transitive dep and our code depends on the shaded guava (`import org.apache.flink.shaded.guava30.com.google.common.base.Strings;`). Removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] [FLINK-27457] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions and support flush mechanism on outputFormats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1121051329

   > > I could be in that case the uncovered race condition won't be fixed. So the output formats tests would fail on cassandra 4.0 between the two PR merges. I guess if we merge the two PRs almost at the same time, it won't be a problem
   > 
   > We would naturally first merge the flushing changes, and then rebase this PR on top of that.
   
   I just opened: https://github.com/apache/flink/pull/19680


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1151108256

   @zentol just rebased on master PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r896866967


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   >  Is it conflicting with netty elsewhere in flink ?
   
   Not necessarily with Flink (nowadays, since flink-shaded-netty exists and akka is isolated), but netty is quite common and used by many systems (including several other connectors afaik), hence why we relocate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1155342829

   done @zentol @absurdfarce thanks for your review guys !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860674923


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   To give some more details about the uncovered race condition I mentioned: when I upgraded the versions `CassandraConnectorITCase#testCassandraBatchPojoFormat` started to fail claiming no records were written. So the problem was not on `CassandraSinkBase` but on `CassandraPojoOutputFormat`. The problem was that the Cassandra session was closed before the end of the asynchronous writes leading to a Cassandra exception saying that the session is already closed. That is why I added the option for synchronous write on `CassandraPojoOutputFormat` so that `sink.writeRecord(pojo)` becomes a blocking call and `sink.close()` is not called until the write is actually done. Then I generalized this option to all sinks for coherence. So, in short, there was no problem with `CassandraSinkBase` and subclasses, the `flush` behavior works just fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860810419


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -140,7 +152,12 @@ public void invoke(IN value) throws Exception {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        if (synchronousWrites) {
+            result.get(timeout, TimeUnit.SECONDS);
+            semaphore.release();

Review Comment:
   thanks `semaphore.release();` should be in a finally block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r859979874


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorBaseTest.java:
##########
@@ -220,7 +225,7 @@ private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, Stri
     }
 
     @NotNull
-    private static Table createTableAnnotation(String keyspace, String tableName) {
+    private Table createTableAnnotation(String keyspace, String tableName) {

Review Comment:
   Why shouldn't it have been static in the first place? The general rule is "if it can be static, make it static.".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r859924128


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorBaseTest.java:
##########
@@ -97,29 +94,33 @@
 import static org.hamcrest.Matchers.samePropertyValuesAs;
 import static org.junit.Assert.assertTrue;
 
-/** IT cases for all cassandra sinks. */
+/**
+ * Base class for IT cases for all Cassandra sinks. This class relies on Cassandra testContainer
+ * that needs to use a ClassRule. Parametrized tests to not work with ClassRules so the actual
+ * testCase classes define the tested version and manage the container.
+ */
 @SuppressWarnings("serial")
 // NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
 @RetryOnException(times = 3, exception = NoHostAvailableException.class)
-public class CassandraConnectorITCase
+public abstract class CassandraConnectorBaseTest

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860859775


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   yes. I don't think you'd ever want to enable that in production (which is what the API should be limited to).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860683628


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Cassandra40ConnectorITCase.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.testcontainers.containers.CassandraContainer;
+
+/** Class for IT cases for all Cassandra sinks tested on Cassandra 4.0.x . */
+public class Cassandra40ConnectorITCase extends CassandraConnectorBaseTest {

Review Comment:
   This makes total sense to me considering that there is neither prod code difference nor dependencies difference. I can definitely remove all the sub-classes and put all the test code back to `CassandraConnectorITCase` with a static 4.0.3 container. This will make the test code way simpler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r895746746


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r858837814


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorBaseTest.java:
##########
@@ -220,7 +225,7 @@ private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, Stri
     }
 
     @NotNull
-    private static Table createTableAnnotation(String keyspace, String tableName) {
+    private Table createTableAnnotation(String keyspace, String tableName) {

Review Comment:
   Why is this no longer static?



##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorBaseTest.java:
##########
@@ -97,29 +94,33 @@
 import static org.hamcrest.Matchers.samePropertyValuesAs;
 import static org.junit.Assert.assertTrue;
 
-/** IT cases for all cassandra sinks. */
+/**
+ * Base class for IT cases for all Cassandra sinks. This class relies on Cassandra testContainer
+ * that needs to use a ClassRule. Parametrized tests to not work with ClassRules so the actual
+ * testCase classes define the tested version and manage the container.
+ */
 @SuppressWarnings("serial")
 // NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
 @RetryOnException(times = 3, exception = NoHostAvailableException.class)
-public class CassandraConnectorITCase
+public abstract class CassandraConnectorBaseTest

Review Comment:
   This isn't a test, and hence the name shouldn't end in "Test". switch "Base" and "Test".



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -61,7 +63,8 @@ under the License.
 							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
 							<artifactSet>
 								<includes>
-									<include>com.datastax.cassandra:cassandra-driver-core</include>
+									<!--driver was relocated by datastax-->

Review Comment:
   ```suggestion
   ```



##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -140,7 +152,12 @@ public void invoke(IN value) throws Exception {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        if (synchronousWrites) {
+            result.get(timeout, TimeUnit.SECONDS);
+            semaphore.release();

Review Comment:
   If this times out the semaphore is never released.



##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Cassandra30ConnectorITCase.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.testcontainers.containers.CassandraContainer;
+
+/** Class for IT cases for all Cassandra sinks tested on Cassandra 3.0.x . */
+public class Cassandra30ConnectorITCase extends CassandraConnectorBaseTest {
+    private static final String TESTED_VERSION = DockerImageVersions.CASSANDRA_3_0;
+
+    @ClassRule
+    public static final CassandraContainer CASSANDRA_CONTAINER =

Review Comment:
   If we're going with several ITCases I'd be tempted to only have this in the sub-classes:
   
   ```
       @BeforeClass
       public static void setup() {
           startAndInitializeCassandra(<version>);
       }
   ```
   and have startAndInitialize also create the container, and manually manage the lifecycle of the cassandra container instead of relying on `ClassRule`



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -37,8 +37,10 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.1</driver.version>
 		<guava.version>18.0</guava.version>

Review Comment:
   did you check whether the driver now uses a different guava version?



##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   This seems strange. The CassandraSinkBase explicitly contains logic to make sure all async requests have completed when shutting down or creating a checkpoint (#flush).
   
   If some issue now appears in the test that implies that either
   a) this logic is now broken, which would be problematic in general since it could cause data-loss
   b) despite writes being acknowledged they aren't persisted properly, at which point I wonder what makes the synchronous call that different (maybe it just works because data is sent at a lower rate)
   
   In any case this needs further investigations.



##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Cassandra40ConnectorITCase.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.testcontainers.containers.CassandraContainer;
+
+/** Class for IT cases for all Cassandra sinks tested on Cassandra 4.0.x . */
+public class Cassandra40ConnectorITCase extends CassandraConnectorBaseTest {

Review Comment:
   I'm not sure if we really need tests for different cassandra versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860910308


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r859920898


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -61,7 +63,8 @@ under the License.
 							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
 							<artifactSet>
 								<includes>
-									<include>com.datastax.cassandra:cassandra-driver-core</include>
+									<!--driver was relocated by datastax-->

Review Comment:
   tested on a local flink cluster with packaged user jar and it works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860943074


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   I dropped the commits 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r859599210


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -61,7 +63,8 @@ under the License.
 							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
 							<artifactSet>
 								<includes>
-									<include>com.datastax.cassandra:cassandra-driver-core</include>
+									<!--driver was relocated by datastax-->

Review Comment:
   reverting this change as the relocation starts with driver 4.0. I'll do all the deps cleaning, create the user jar and test it on a flink pipeline to check that whole deps and shading are ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r897582880


##########
flink-connectors/flink-connector-cassandra/src/main/resources/META-INF/NOTICE:
##########
@@ -6,9 +6,9 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) 
 
-- com.datastax.cassandra:cassandra-driver-core:3.0.0
-- com.datastax.cassandra:cassandra-driver-mapping:3.0.0
-- com.google.guava:guava:18.0
+- com.datastax.cassandra:cassandra-driver-core:3.11.2
+- com.datastax.cassandra:cassandra-driver-mapping:3.11.2
+- com.google.guava:guava:19.0
 - io.netty:netty-buffer:4.1.70.Final

Review Comment:
   eeeeeeeeeeeeeeeeeeeeeeeeeh
   
   license check now complains that netty isn't being bundled, because the shade-plugin doesn't see it.
   So I guess we'll just remove these entries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mzuehlke commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
mzuehlke commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r862776681


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   > I dropped the commits
   
   @echauchot All the `sink.setSynchronousWrites` are still in. Did I got the conversation right that they all should be dropped from this PR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19586: [FLINK-26824] [FLINK-27457] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions and support flush mechanism on outputFormats

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1120143673

   > I could be in that case the uncovered race condition won't be fixed. So the output formats tests would fail on cassandra 4.0 between the two PR merges. I guess if we merge the two PRs almost at the same time, it won't be a problem
   
   We would naturally first merge the flushing changes, and then rebase this PR on top of that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r895688621


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>
+		<guava.version>19.0</guava.version>

Review Comment:
   cf my comment below about test leftover



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r895748559


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java:
##########
@@ -106,6 +106,8 @@
 
     private static final int MAX_CONNECTION_RETRY = 3;
     private static final long CONNECTION_RETRY_DELAY = 500L;
+    private static final int WRITE_TIMEOUT = 10;

Review Comment:
   conflict resolution bug, thanks for catching it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1156411732

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1109903067

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bac3ee51471798a0c3904872e1a347c8b3f64904",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bac3ee51471798a0c3904872e1a347c8b3f64904",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bac3ee51471798a0c3904872e1a347c8b3f64904 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860842312


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   then we should instead introduce the flushing logic into the output format imo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860847936


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   Ok I can do that if you prefer. Do you want that I drop all the synchronous write features on all the sinks ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r896866967


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   >  Is it conflicting with netty elsewhere in flink ?
   
   Not necessarily with Flink, but netty is quite common and used by many systems (including several other connectors afaik), hence why we relocate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] absurdfarce commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
absurdfarce commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r895920666


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   You should be able to safely exclude the jnr-ffi and jnr-posix dependencies.  The driver uses jnr-posix to access native calls for two optimizations (easy access to the PID + microsecond timestamp precision), both of which are indeed optional (the driver will detect their absence and has non-native fallbacks available for both cases).  You'll get debug-level slf4j stack traces but that should be the worst consequence.
   
   The jackson-databind dependency is used for parsing an XML config in the Astra connectivity code.  This can also be excluded for the non-Astra case (which presumably is the default).
   
   A hearty 👍 to the use of the shaded JAR as well.
   
   I have a simple test app that builds and runs without issue using the following Gradle dependency:
   
   ```
       compile('com.datastax.cassandra:cassandra-driver-core:3.11.2:shaded') {
           exclude group:'io.netty', module:'netty-handler'
           exclude group:'com.github.jnr', module:'jnr-ffi'
           exclude group:'com.github.jnr', module:'jnr-posix'
           exclude group:'com.fasterxml.jackson.core', module:'jackson-databind'
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r896832924


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   @zentol a question related to netty: I went through [FLINK-8295](https://issues.apache.org/jira/browse/FLINK-8295) and in this ticket we relocate netty to `com.datastax.shaded.netty` so that cassandra driver could find it. But it would have found it also with default `io.netty` so why can't we just keep netty transitive dep as it is ? Is it conflicting with netty elsewhere in flink ? (I checked the version used in cassandra is the same as in flink parent) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r896506025


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -36,9 +36,11 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
-		<guava.version>18.0</guava.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.2</driver.version>

Review Comment:
   @absurdfarce thanks for jumping on this PR. And thanks for the confirmation and recommendations. I'll do what you both recommend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r897685917


##########
flink-connectors/flink-connector-cassandra/src/main/resources/META-INF/NOTICE:
##########
@@ -6,9 +6,9 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) 
 
-- com.datastax.cassandra:cassandra-driver-core:3.0.0
-- com.datastax.cassandra:cassandra-driver-mapping:3.0.0
-- com.google.guava:guava:18.0
+- com.datastax.cassandra:cassandra-driver-core:3.11.2
+- com.datastax.cassandra:cassandra-driver-mapping:3.11.2
+- com.google.guava:guava:19.0
 - io.netty:netty-buffer:4.1.70.Final

Review Comment:
   yet another check that I was not aware of :smile: and that I do not see in my local clean verify on cassandra module. Thanks for pointing out @zentol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860674923


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   To give some more details about the uncovered race condition I mentioned: when I upgraded the versions `CassandraConnectorITCase#testCassandraBatchPojoFormat` started to fail claiming no records were written. So the problem was not on `CassandraSinkBase` but on `CassandraPojoOutputFormat`. The problem was that the Cassandra session was closed before the end of the asynchronous writes leading to a Cassandra exception saying that the session is already closed. That is why I added the option for synchronous write on `CassandraPojoOutputFormat` so that `sink.writeRecord(pojo)` becomes a blocking call and `sink.close()` is not called until the write is actually done. Then I generalized this option to all sinks for coherence. So, in short, there was no problem with CassandraSinkBase and subclasses, the `flush` behavior works just fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860683628


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Cassandra40ConnectorITCase.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.testcontainers.containers.CassandraContainer;
+
+/** Class for IT cases for all Cassandra sinks tested on Cassandra 4.0.x . */
+public class Cassandra40ConnectorITCase extends CassandraConnectorBaseTest {

Review Comment:
   This makes total sense to me considering that there is neither prod code difference nor dependencies difference. I can definitely remove all the sub-classes and put all the test code back to `CassandraConnectorITCase` with a static 4.0.3 container. This will make the test code way simpler. 
   At least we know now that this support indeed works :smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #19586: [FLINK-26824] [FLINK-27457] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions and support flush mechanism on outputFormats

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1119539502

   Would you mind moving the flushing changes into another PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r862833765


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   @mzuehlke yes I have not push the drop yet because I'm replacing the synchronous writes by an impl of flush mechanism as part of [FLINK-27457](https://issues.apache.org/jira/browse/FLINK-27457). I'm done with prod code I just need to add the tests for the flush mechanism. I should be able to push all of that soon and this PR will address the 2 tickets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mzuehlke commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
mzuehlke commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1114757318

   Great finding that 3.x drivers are still maintained and are supporting Cassandra 4.x 👍 
   btw. in the meantime version 3.11.2 got released:
   https://github.com/datastax/java-driver/blob/3.x/changelog/README.md#3112
   https://github.com/datastax/java-driver/releases/tag/3.11.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] [FLINK-27457] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions and support flush mechanism on outputFormats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1121050292

   > Please note that the existing class `CassandraOutputFormatBase` that was previously used as a base class only for Tuple and Row outputFormats is now used as a base class for the 3 output formats including Pojo. the base class for column based output formats (tuple and row) is now a new class called `CassandraColumnarOutputFormatBase`.
   > Regarding configuration of the flush I preferred using simple setters to a configuration object as there was no builders for the output formats.
   > Regarding other modules: I extracted a utility method for semaphore management (SinkUtils) because it is used by both sinks and output formats now. And I also had to change the exceptions thrown in `OutputFormat` as some methods can now throw `TimeoutException` and `InterruptedException` because of the flush mechanism. I think it is ok as this interface is not user facing.
   
   I just opened: https://github.com/apache/flink/pull/19680


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] [FLINK-27457] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions and support flush mechanism on outputFormats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1119594598

   > Would you mind moving the flushing changes into another PR?
   
   I could be in that case the uncovered race condition won't be fixed. So the output formats tests would fail on cassandra 4.0 between the two PR merges.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r862833765


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java:
##########
@@ -72,6 +74,16 @@
         ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Set writes to be synchronous (block until writes are completed).
+     *
+     * @param timeout Maximum number of seconds to wait for write completion
+     */
+    public void setSynchronousWrites(int timeout) {

Review Comment:
   @mzuehlke yes I have push the drop yet because I'm replacing the synchronous write by an impl of flush mechanism as part of [FLINK-27457](https://issues.apache.org/jira/browse/FLINK-27457). I'm done with prod code I just need to add the tests for the flush mechanism. I should be able to push all of that soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] [FLINK-27457] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions and support flush mechanism on outputFormats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1119360748

   @zentol I added the flush mechanism to all Cassandra output formats and the related tests (vastly inspired from `CassandraSinkBaseTest`) in the 3 last commits (different ticket). Please note that the existing class `CassandraOutputFormatBase` that was previously used as a base class only for Tuple and Row outputFormats is now used as a base class for the 3 output formats including Pojo. the base class for column based output formats (tuple and row) is now a new class called `CassandraColumnarOutputFormatBase`.
   Regarding configuration of the flush I preferred using simple setters to a configuration object as there was no builders for the output formats.
   Regarding other modules: I extracted a utility method for semaphore management (SinkUtils) because it is used by both sinks and output formats now. And I also had to change the exceptions thrown in `OutputFormat` as some methods can now throw `TimeoutException` and `InterruptedException` because of the flush mechanism.  I think it is ok as this interface is not user facing.
   
   PTAL
   
   CC: @mzuehlke 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860820983


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Cassandra30ConnectorITCase.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.testcontainers.containers.CassandraContainer;
+
+/** Class for IT cases for all Cassandra sinks tested on Cassandra 3.0.x . */
+public class Cassandra30ConnectorITCase extends CassandraConnectorBaseTest {
+    private static final String TESTED_VERSION = DockerImageVersions.CASSANDRA_3_0;
+
+    @ClassRule
+    public static final CassandraContainer CASSANDRA_CONTAINER =

Review Comment:
   dropping the sub-classes cf https://github.com/apache/flink/pull/19586#discussion_r860683628



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19586:
URL: https://github.com/apache/flink/pull/19586#issuecomment-1112142241

   @zentol thanks for the review. I addressed all your comments. I left the review comments isolated to ease your second round of review. But in the end I think we should squash test code commits together leaving alone maybe [bac3ee51471798a0c3904872e1a347c8b3f64904](https://github.com/apache/flink/pull/19586/commits/bac3ee51471798a0c3904872e1a347c8b3f64904). For production code commits, I would avoid squashing them with test code commits as they add a new feature to the sinks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r859596704


##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -37,8 +37,10 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.1</driver.version>
 		<guava.version>18.0</guava.version>

Review Comment:
   Actually I don't think this explicit dependency was needed in the first place. Indeed cassandra driver 3.0.0 had `com.google.guava:guava:jar:16.0.1:compile` as a transitive dep and our code depends on the shaded guava (`import org.apache.flink.shaded.guava30.com.google.common.base.Strings;`). Removing it.



##########
flink-connectors/flink-connector-cassandra/pom.xml:
##########
@@ -37,8 +37,10 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.5</cassandra.version>
-		<driver.version>3.0.0</driver.version>
+		<!-- Cassandra version 4.x allow to address clusters of version 3.x as well -->
+		<cassandra.version>4.0.3</cassandra.version>
+		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
+		<driver.version>3.11.1</driver.version>
 		<guava.version>18.0</guava.version>

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860599374


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Cassandra40ConnectorITCase.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.testcontainers.containers.CassandraContainer;
+
+/** Class for IT cases for all Cassandra sinks tested on Cassandra 4.0.x . */
+public class Cassandra40ConnectorITCase extends CassandraConnectorBaseTest {

Review Comment:
   You mean we should test only the latest version ? The ticket states that we should support all the versions that Apache Cassandra supports (3.0, 4.0, 3.11), how can we be sure that we support them if we don't have these version specific tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19586:
URL: https://github.com/apache/flink/pull/19586#discussion_r860595342


##########
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorBaseTest.java:
##########
@@ -220,7 +225,7 @@ private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, Stri
     }
 
     @NotNull
-    private static Table createTableAnnotation(String keyspace, String tableName) {
+    private Table createTableAnnotation(String keyspace, String tableName) {

Review Comment:
   ok, :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #19586: [FLINK-26824] Upgrade Flink's supported Cassandra versions to match all Apache Cassandra supported versions

Posted by GitBox <gi...@apache.org>.
zentol merged PR #19586:
URL: https://github.com/apache/flink/pull/19586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org