You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/12/07 08:18:01 UTC

[flink] branch master updated (4c67f8fca52 -> 12cb2212f78)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 4c67f8fca52 [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory
     new 7bb75ab6d96 [hotfix][python] Depend on externalized ES connector
     new 12cb2212f78 [FLINK-30312][cassandra] Remove Cassandra connector

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/connectors/datastream/cassandra.md        | 288 -------
 .../docs/connectors/datastream/cassandra.md        | 288 -------
 docs/setup_docs.sh                                 |   1 +
 .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8           |   4 +-
 .../b8900323-6aab-4e7e-9b17-f53b3c3dca46           |   3 -
 .../flink-architecture-tests-production/pom.xml    |   5 -
 flink-architecture-tests/pom.xml                   |   7 -
 .../b7279bb1-1eb7-40c0-931d-f6db7971d126           |   0
 .../dc1ba6f4-3d84-498c-a085-e02ba5936201           |   6 -
 .../archunit-violations/stored.rules               |   4 -
 flink-connectors/flink-connector-cassandra/pom.xml | 218 -----
 .../CassandraColumnarOutputFormatBase.java         |  68 --
 .../connectors/cassandra/CassandraInputFormat.java |  68 --
 .../cassandra/CassandraInputFormatBase.java        | 102 ---
 .../cassandra/CassandraOutputFormat.java           |  35 -
 .../cassandra/CassandraOutputFormatBase.java       | 129 ---
 .../cassandra/CassandraPojoInputFormat.java        |  81 --
 .../cassandra/CassandraPojoOutputFormat.java       |  97 ---
 .../cassandra/CassandraRowOutputFormat.java        |  52 --
 .../cassandra/CassandraTupleOutputFormat.java      |  54 --
 .../cassandra/AbstractCassandraTupleSink.java      |  72 --
 .../cassandra/CassandraAppendTableSink.java        | 103 ---
 .../connectors/cassandra/CassandraCommitter.java   | 160 ----
 .../cassandra/CassandraFailureHandler.java         |  62 --
 .../connectors/cassandra/CassandraPojoSink.java    | 125 ---
 .../connectors/cassandra/CassandraRowSink.java     |  60 --
 .../cassandra/CassandraRowWriteAheadSink.java      | 170 ----
 .../cassandra/CassandraScalaProductSink.java       |  55 --
 .../connectors/cassandra/CassandraSink.java        | 663 ---------------
 .../connectors/cassandra/CassandraSinkBase.java    | 194 -----
 .../cassandra/CassandraSinkBaseConfig.java         | 125 ---
 .../connectors/cassandra/CassandraTupleSink.java   |  53 --
 .../cassandra/CassandraTupleWriteAheadSink.java    | 170 ----
 .../connectors/cassandra/ClusterBuilder.java       |  43 -
 .../connectors/cassandra/MapperOptions.java        |  37 -
 .../cassandra/NoOpCassandraFailureHandler.java     |  35 -
 .../connectors/cassandra/SimpleMapperOptions.java  | 121 ---
 .../src/main/resources/META-INF/NOTICE             |  11 -
 .../architecture/TestCodeArchitectureTest.java     |  43 -
 .../connectors/cassandra/example/BatchExample.java |  88 --
 .../cassandra/example/BatchPojoExample.java        |  93 ---
 .../batch/connectors/cassandra/example/Pojo.java   |  72 --
 .../cassandra/utils/ResultSetFutures.java          | 103 ---
 .../cassandra/CassandraConnectorITCase.java        | 912 ---------------------
 .../cassandra/CassandraSinkBaseTest.java           | 453 ----------
 .../CassandraTupleWriteAheadSinkTest.java          | 150 ----
 .../flink/streaming/connectors/cassandra/Pojo.java |  70 --
 .../example/CassandraPojoSinkExample.java          |  68 --
 .../example/CassandraTupleSinkExample.java         |  68 --
 .../CassandraTupleWriteAheadSinkExample.java       | 118 ---
 .../connectors/cassandra/example/Message.java      |  62 --
 .../org.junit.jupiter.api.extension.Extension      |  16 -
 .../src/test/resources/archunit.properties         |  31 -
 .../src/test/resources/log4j2-test.properties      |  28 -
 flink-connectors/pom.xml                           |   1 -
 flink-python/pom.xml                               |   4 +-
 .../org/apache/flink/util/DockerImageVersions.java |   2 -
 tools/azure-pipelines/cache_docker_images.sh       |   2 +-
 tools/ci/stage.sh                                  |   1 -
 59 files changed, 5 insertions(+), 6149 deletions(-)
 delete mode 100644 docs/content.zh/docs/connectors/datastream/cassandra.md
 delete mode 100644 docs/content/docs/connectors/datastream/cassandra.md
 delete mode 100644 flink-connectors/flink-connector-cassandra/archunit-violations/b7279bb1-1eb7-40c0-931d-f6db7971d126
 delete mode 100644 flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
 delete mode 100644 flink-connectors/flink-connector-cassandra/archunit-violations/stored.rules
 delete mode 100644 flink-connectors/flink-connector-cassandra/pom.xml
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraColumnarOutputFormatBase.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/NoOpCassandraFailureHandler.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleMapperOptions.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/main/resources/META-INF/NOTICE
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/ResultSetFutures.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/resources/archunit.properties
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/resources/log4j2-test.properties


[flink] 02/02: [FLINK-30312][cassandra] Remove Cassandra connector

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 12cb2212f7836fe668edc0523f4f4e2efbeaf8de
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Dec 6 11:33:03 2022 +0100

    [FLINK-30312][cassandra] Remove Cassandra connector
---
 .../docs/connectors/datastream/cassandra.md        | 288 -------
 .../docs/connectors/datastream/cassandra.md        | 288 -------
 docs/setup_docs.sh                                 |   1 +
 .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8           |   4 +-
 .../b8900323-6aab-4e7e-9b17-f53b3c3dca46           |   3 -
 .../flink-architecture-tests-production/pom.xml    |   5 -
 flink-architecture-tests/pom.xml                   |   7 -
 .../b7279bb1-1eb7-40c0-931d-f6db7971d126           |   0
 .../dc1ba6f4-3d84-498c-a085-e02ba5936201           |   6 -
 .../archunit-violations/stored.rules               |   4 -
 flink-connectors/flink-connector-cassandra/pom.xml | 218 -----
 .../CassandraColumnarOutputFormatBase.java         |  68 --
 .../connectors/cassandra/CassandraInputFormat.java |  68 --
 .../cassandra/CassandraInputFormatBase.java        | 102 ---
 .../cassandra/CassandraOutputFormat.java           |  35 -
 .../cassandra/CassandraOutputFormatBase.java       | 129 ---
 .../cassandra/CassandraPojoInputFormat.java        |  81 --
 .../cassandra/CassandraPojoOutputFormat.java       |  97 ---
 .../cassandra/CassandraRowOutputFormat.java        |  52 --
 .../cassandra/CassandraTupleOutputFormat.java      |  54 --
 .../cassandra/AbstractCassandraTupleSink.java      |  72 --
 .../cassandra/CassandraAppendTableSink.java        | 103 ---
 .../connectors/cassandra/CassandraCommitter.java   | 160 ----
 .../cassandra/CassandraFailureHandler.java         |  62 --
 .../connectors/cassandra/CassandraPojoSink.java    | 125 ---
 .../connectors/cassandra/CassandraRowSink.java     |  60 --
 .../cassandra/CassandraRowWriteAheadSink.java      | 170 ----
 .../cassandra/CassandraScalaProductSink.java       |  55 --
 .../connectors/cassandra/CassandraSink.java        | 663 ---------------
 .../connectors/cassandra/CassandraSinkBase.java    | 194 -----
 .../cassandra/CassandraSinkBaseConfig.java         | 125 ---
 .../connectors/cassandra/CassandraTupleSink.java   |  53 --
 .../cassandra/CassandraTupleWriteAheadSink.java    | 170 ----
 .../connectors/cassandra/ClusterBuilder.java       |  43 -
 .../connectors/cassandra/MapperOptions.java        |  37 -
 .../cassandra/NoOpCassandraFailureHandler.java     |  35 -
 .../connectors/cassandra/SimpleMapperOptions.java  | 121 ---
 .../src/main/resources/META-INF/NOTICE             |  11 -
 .../architecture/TestCodeArchitectureTest.java     |  43 -
 .../connectors/cassandra/example/BatchExample.java |  88 --
 .../cassandra/example/BatchPojoExample.java        |  93 ---
 .../batch/connectors/cassandra/example/Pojo.java   |  72 --
 .../cassandra/utils/ResultSetFutures.java          | 103 ---
 .../cassandra/CassandraConnectorITCase.java        | 912 ---------------------
 .../cassandra/CassandraSinkBaseTest.java           | 453 ----------
 .../CassandraTupleWriteAheadSinkTest.java          | 150 ----
 .../flink/streaming/connectors/cassandra/Pojo.java |  70 --
 .../example/CassandraPojoSinkExample.java          |  68 --
 .../example/CassandraTupleSinkExample.java         |  68 --
 .../CassandraTupleWriteAheadSinkExample.java       | 118 ---
 .../connectors/cassandra/example/Message.java      |  62 --
 .../org.junit.jupiter.api.extension.Extension      |  16 -
 .../src/test/resources/archunit.properties         |  31 -
 .../src/test/resources/log4j2-test.properties      |  28 -
 flink-connectors/pom.xml                           |   1 -
 flink-python/pom.xml                               |   2 +-
 .../org/apache/flink/util/DockerImageVersions.java |   2 -
 tools/azure-pipelines/cache_docker_images.sh       |   2 +-
 tools/ci/stage.sh                                  |   1 -
 59 files changed, 4 insertions(+), 6148 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/cassandra.md b/docs/content.zh/docs/connectors/datastream/cassandra.md
deleted file mode 100644
index 4969027a8c0..00000000000
--- a/docs/content.zh/docs/connectors/datastream/cassandra.md
+++ /dev/null
@@ -1,288 +0,0 @@
----
-title: Cassandra
-weight: 4
-type: docs
-aliases:
-  - /zh/dev/connectors/cassandra.html
-  - /zh/apis/streaming/connectors/cassandra.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Apache Cassandra Connector
-
-This connector provides sinks that writes data into a [Apache Cassandra](https://cassandra.apache.org/) database.
-
-<!--
-  TODO: Perhaps worth mentioning current DataStax Java Driver version to match Cassandra version on user side.
--->
-
-To use this connector, add the following dependency to your project:
-
-{{< artifact flink-connector-cassandra withScalaVersion >}}
-
-Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
-
-## Installing Apache Cassandra
-There are multiple ways to bring up a Cassandra instance on local machine:
-
-1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
-2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
-
-## Cassandra Sinks
-
-### Configurations
-
-Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
-This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
-
-The following configuration methods can be used:
-
-1. _setQuery(String query)_
-    * Sets the upsert query that is executed for every record the sink receives.
-    * The query is internally treated as CQL statement.
-    * __DO__ set the upsert query for processing __Tuple__ data type.
-    * __DO NOT__ set the query for processing __POJO__ data types.
-2. _setClusterBuilder(ClusterBuilder clusterBuilder)_
-    * Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
-3. _setHost(String host[, int port])_
-    * Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
-4. _setMapperOptions(MapperOptions options)_
-    * Sets the mapper options that are used to configure the DataStax ObjectMapper.
-    * Only applies when processing __POJO__ data types.
-5. _setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout)_
-    * Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute.
-    * Only applies when __enableWriteAheadLog()__ is not configured.
-6. _enableWriteAheadLog([CheckpointCommitter committer])_
-    * An __optional__ setting
-    * Allows exactly-once processing for non-deterministic algorithms.
-7. _setFailureHandler([CassandraFailureHandler failureHandler])_
-    * An __optional__ setting
-    * Sets the custom failure handler.
-8. _setDefaultKeyspace(String keyspace)_
-    * Sets the default keyspace to be used.
-9. _enableIgnoreNullFields()_
-    * Enables ignoring null values, treats null values as unset and avoids writing null fields and creating tombstones.
-10. _build()_
-    * Finalizes the configuration and constructs the CassandraSink instance.
-
-### Write-ahead Log
-
-A checkpoint committer stores additional information about completed checkpoints
-in some resource. This information is used to prevent a full replay of the last
-completed checkpoint in case of a failure.
-You can use a `CassandraCommitter` to store these in a separate table in cassandra.
-Note that this table will NOT be cleaned up by Flink.
-
-Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple
-times without changing the result) and checkpointing is enabled. In case of a failure the failed
-checkpoint will be replayed completely.
-
-Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program
-the replayed checkpoint may be completely different than the previous attempt, which may leave the
-database in an inconsistent state since part of the first attempt may already be written.
-The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt.
-Note that that enabling this feature will have an adverse impact on latency.
-
-<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list.</p>
-
-### Checkpointing and Fault Tolerance
-With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
-
-More details on [checkpoints docs]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing" >}}) and [fault tolerance guarantee docs]({{< ref "docs/connectors/datastream/guarantees" >}})
-
-## Examples
-
-The Cassandra sink currently supports both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use of those streaming data types, please refer to [Supported Data Types]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}#supported-data-types). We show two implementations based on {{< gh_link file="flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWord [...]
-
-In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
-
-{{< tabs "ffc5c4d4-7872-479c-bfa6-206b9e96f6f3" >}}
-{{< tab "CQL" >}}
-```sql
-CREATE KEYSPACE IF NOT EXISTS example
-    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
-
-CREATE TABLE IF NOT EXISTS example.wordcount (
-    word text,
-    count bigint,
-    PRIMARY KEY(word)
-);
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-### Cassandra Sink Example for Streaming Tuple Data Type
-While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to the database. With the upsert query cached as `PreparedStatement`, each Tuple element is converted to parameters of the statement.
-
-For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
-
-{{< tabs "1a84c6a0-0b2f-4f96-8cf8-43ec6dd3bc5d" >}}
-{{< tab "Java" >}}
-```java
-// get the execution environment
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// get input data by connecting to the socket
-DataStream<String> text = env.socketTextStream(hostname, port, "\n");
-
-// parse the data, group it, window it, and aggregate the counts
-DataStream<Tuple2<String, Long>> result = text
-        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
-            @Override
-            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
-                // normalize and split the line
-                String[] words = value.toLowerCase().split("\\s");
-
-                // emit the pairs
-                for (String word : words) {
-                    //Do not accept empty word, since word is defined as primary key in C* table
-                    if (!word.isEmpty()) {
-                        out.collect(new Tuple2<String, Long>(word, 1L));
-                    }
-                }
-            }
-        })
-        .keyBy(value -> value.f0)
-        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-        .sum(1);
-
-CassandraSink.addSink(result)
-        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
-        .setHost("127.0.0.1")
-        .build();
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-// get input data by connecting to the socket
-val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
-
-// parse the data, group it, window it, and aggregate the counts
-val result: DataStream[(String, Long)] = text
-  // split up the lines in pairs (2-tuples) containing: (word,1)
-  .flatMap(_.toLowerCase.split("\\s"))
-  .filter(_.nonEmpty)
-  .map((_, 1L))
-  // group by the tuple field "0" and sum up tuple field "1"
-  .keyBy(_._1)
-  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-  .sum(1)
-
-CassandraSink.addSink(result)
-  .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
-  .setHost("127.0.0.1")
-  .build()
-
-result.print().setParallelism(1)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-
-### Cassandra Sink Example for Streaming POJO Data Type
-An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver `com.datastax.driver.mapping.Mapper` class.
-
-The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class.  For details of the mapping, please refer to CQL documentation on [Definition of Mapped Classes](http://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper/creating/) and [CQL Data types](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
-
-{{< tabs "d65ca6f5-acb2-4f2c-b5b6-d986eafca765" >}}
-{{< tab "Java" >}}
-```java
-// get the execution environment
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// get input data by connecting to the socket
-DataStream<String> text = env.socketTextStream(hostname, port, "\n");
-
-// parse the data, group it, window it, and aggregate the counts
-DataStream<WordCount> result = text
-        .flatMap(new FlatMapFunction<String, WordCount>() {
-            public void flatMap(String value, Collector<WordCount> out) {
-                // normalize and split the line
-                String[] words = value.toLowerCase().split("\\s");
-
-                // emit the pairs
-                for (String word : words) {
-                    if (!word.isEmpty()) {
-                        //Do not accept empty word, since word is defined as primary key in C* table
-                        out.collect(new WordCount(word, 1L));
-                    }
-                }
-            }
-        })
-        .keyBy(WordCount::getWord)
-        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-
-        .reduce(new ReduceFunction<WordCount>() {
-            @Override
-            public WordCount reduce(WordCount a, WordCount b) {
-                return new WordCount(a.getWord(), a.getCount() + b.getCount());
-            }
-        });
-
-CassandraSink.addSink(result)
-        .setHost("127.0.0.1")
-        .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
-        .build();
-
-
-@Table(keyspace = "example", name = "wordcount")
-public class WordCount {
-
-    @Column(name = "word")
-    private String word = "";
-
-    @Column(name = "count")
-    private long count = 0;
-
-    public WordCount() {}
-
-    public WordCount(String word, long count) {
-        this.setWord(word);
-        this.setCount(count);
-    }
-
-    public String getWord() {
-        return word;
-    }
-
-    public void setWord(String word) {
-        this.word = word;
-    }
-
-    public long getCount() {
-        return count;
-    }
-
-    public void setCount(long count) {
-        this.count = count;
-    }
-
-    @Override
-    public String toString() {
-        return getWord() + " : " + getCount();
-    }
-}
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/cassandra.md b/docs/content/docs/connectors/datastream/cassandra.md
deleted file mode 100644
index 25b738141e9..00000000000
--- a/docs/content/docs/connectors/datastream/cassandra.md
+++ /dev/null
@@ -1,288 +0,0 @@
----
-title: Cassandra
-weight: 4
-type: docs
-aliases:
-  - /dev/connectors/cassandra.html
-  - /apis/streaming/connectors/cassandra.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Apache Cassandra Connector
-
-This connector provides sinks that writes data into a [Apache Cassandra](https://cassandra.apache.org/) database.
-
-<!--
-  TODO: Perhaps worth mentioning current DataStax Java Driver version to match Cassandra version on user side.
--->
-
-To use this connector, add the following dependency to your project:
-
-{{< artifact flink-connector-cassandra withScalaVersion >}}
-
-Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
-
-## Installing Apache Cassandra
-There are multiple ways to bring up a Cassandra instance on local machine:
-
-1. Follow the instructions from [Cassandra Getting Started page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
-2. Launch a container running Cassandra from [Official Docker Repository](https://hub.docker.com/_/cassandra/)
-
-## Cassandra Sinks
-
-### Configurations
-
-Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream<IN> input) method.
-This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally `build()` the sink instance.
-
-The following configuration methods can be used:
-
-1. _setQuery(String query)_
-    * Sets the upsert query that is executed for every record the sink receives.
-    * The query is internally treated as CQL statement.
-    * __DO__ set the upsert query for processing __Tuple__ data type.
-    * __DO NOT__ set the query for processing __POJO__ data types.
-2. _setClusterBuilder(ClusterBuilder clusterBuilder)_
-    * Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
-3. _setHost(String host[, int port])_
-    * Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
-4. _setMapperOptions(MapperOptions options)_
-    * Sets the mapper options that are used to configure the DataStax ObjectMapper.
-    * Only applies when processing __POJO__ data types.
-5. _setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout)_
-    * Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute.
-    * Only applies when __enableWriteAheadLog()__ is not configured.
-6. _enableWriteAheadLog([CheckpointCommitter committer])_
-    * An __optional__ setting
-    * Allows exactly-once processing for non-deterministic algorithms.
-7. _setFailureHandler([CassandraFailureHandler failureHandler])_
-    * An __optional__ setting
-    * Sets the custom failure handler.
-8. _setDefaultKeyspace(String keyspace)_
-    * Sets the default keyspace to be used.
-9. _enableIgnoreNullFields()_
-    * Enables ignoring null values, treats null values as unset and avoids writing null fields and creating tombstones.
-10. _build()_
-    * Finalizes the configuration and constructs the CassandraSink instance.
-
-### Write-ahead Log
-
-A checkpoint committer stores additional information about completed checkpoints
-in some resource. This information is used to prevent a full replay of the last
-completed checkpoint in case of a failure.
-You can use a `CassandraCommitter` to store these in a separate table in cassandra.
-Note that this table will NOT be cleaned up by Flink.
-
-Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple
-times without changing the result) and checkpointing is enabled. In case of a failure the failed
-checkpoint will be replayed completely.
-
-Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program
-the replayed checkpoint may be completely different than the previous attempt, which may leave the
-database in an inconsistent state since part of the first attempt may already be written.
-The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt.
-Note that that enabling this feature will have an adverse impact on latency.
-
-<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list.</p>
-
-### Checkpointing and Fault Tolerance
-With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
-
-More details on [checkpoints docs]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing" >}}) and [fault tolerance guarantee docs]({{< ref "docs/connectors/datastream/guarantees" >}})
-
-## Examples
-
-The Cassandra sink currently supports both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use of those streaming data types, please refer to [Supported Data Types]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}#supported-data-types). We show two implementations based on {{< gh_link file="flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWord [...]
-
-In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
-
-{{< tabs "ffc5c4d4-7872-479c-bfa6-206b9e96f6f3" >}}
-{{< tab "CQL" >}}
-```sql
-CREATE KEYSPACE IF NOT EXISTS example
-    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
-
-CREATE TABLE IF NOT EXISTS example.wordcount (
-    word text,
-    count bigint,
-    PRIMARY KEY(word)
-);
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-### Cassandra Sink Example for Streaming Tuple Data Type
-While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to the database. With the upsert query cached as `PreparedStatement`, each Tuple element is converted to parameters of the statement.
-
-For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
-
-{{< tabs "1a84c6a0-0b2f-4f96-8cf8-43ec6dd3bc5d" >}}
-{{< tab "Java" >}}
-```java
-// get the execution environment
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// get input data by connecting to the socket
-DataStream<String> text = env.socketTextStream(hostname, port, "\n");
-
-// parse the data, group it, window it, and aggregate the counts
-DataStream<Tuple2<String, Long>> result = text
-        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
-            @Override
-            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
-                // normalize and split the line
-                String[] words = value.toLowerCase().split("\\s");
-
-                // emit the pairs
-                for (String word : words) {
-                    //Do not accept empty word, since word is defined as primary key in C* table
-                    if (!word.isEmpty()) {
-                        out.collect(new Tuple2<String, Long>(word, 1L));
-                    }
-                }
-            }
-        })
-        .keyBy(value -> value.f0)
-        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-        .sum(1);
-
-CassandraSink.addSink(result)
-        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
-        .setHost("127.0.0.1")
-        .build();
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-// get input data by connecting to the socket
-val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
-
-// parse the data, group it, window it, and aggregate the counts
-val result: DataStream[(String, Long)] = text
-  // split up the lines in pairs (2-tuples) containing: (word,1)
-  .flatMap(_.toLowerCase.split("\\s"))
-  .filter(_.nonEmpty)
-  .map((_, 1L))
-  // group by the tuple field "0" and sum up tuple field "1"
-  .keyBy(_._1)
-  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-  .sum(1)
-
-CassandraSink.addSink(result)
-  .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
-  .setHost("127.0.0.1")
-  .build()
-
-result.print().setParallelism(1)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-
-### Cassandra Sink Example for Streaming POJO Data Type
-An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow [DataStax Java Driver Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/) to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver `com.datastax.driver.mapping.Mapper` class.
-
-The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class.  For details of the mapping, please refer to CQL documentation on [Definition of Mapped Classes](http://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper/creating/) and [CQL Data types](https://docs.datastax.com/en/cql/3.1/cql/cql_reference/cql_data_types_c.html)
-
-{{< tabs "d65ca6f5-acb2-4f2c-b5b6-d986eafca765" >}}
-{{< tab "Java" >}}
-```java
-// get the execution environment
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// get input data by connecting to the socket
-DataStream<String> text = env.socketTextStream(hostname, port, "\n");
-
-// parse the data, group it, window it, and aggregate the counts
-DataStream<WordCount> result = text
-        .flatMap(new FlatMapFunction<String, WordCount>() {
-            public void flatMap(String value, Collector<WordCount> out) {
-                // normalize and split the line
-                String[] words = value.toLowerCase().split("\\s");
-
-                // emit the pairs
-                for (String word : words) {
-                    if (!word.isEmpty()) {
-                        //Do not accept empty word, since word is defined as primary key in C* table
-                        out.collect(new WordCount(word, 1L));
-                    }
-                }
-            }
-        })
-        .keyBy(WordCount::getWord)
-        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-
-        .reduce(new ReduceFunction<WordCount>() {
-            @Override
-            public WordCount reduce(WordCount a, WordCount b) {
-                return new WordCount(a.getWord(), a.getCount() + b.getCount());
-            }
-        });
-
-CassandraSink.addSink(result)
-        .setHost("127.0.0.1")
-        .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
-        .build();
-
-
-@Table(keyspace = "example", name = "wordcount")
-public class WordCount {
-
-    @Column(name = "word")
-    private String word = "";
-
-    @Column(name = "count")
-    private long count = 0;
-
-    public WordCount() {}
-
-    public WordCount(String word, long count) {
-        this.setWord(word);
-        this.setCount(count);
-    }
-
-    public String getWord() {
-        return word;
-    }
-
-    public void setWord(String word) {
-        this.word = word;
-    }
-
-    public long getCount() {
-        return count;
-    }
-
-    public void setCount(long count) {
-        this.count = count;
-    }
-
-    @Override
-    public String toString() {
-        return getWord() + " : " + getCount();
-    }
-}
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-{{< top >}}
diff --git a/docs/setup_docs.sh b/docs/setup_docs.sh
index 1854bd63b82..f66f5a44e70 100755
--- a/docs/setup_docs.sh
+++ b/docs/setup_docs.sh
@@ -53,6 +53,7 @@ cd tmp
 
 integrate_connector_docs elasticsearch v3.0.0
 integrate_connector_docs aws v3.0.0
+integrate_connector_docs cassandra v3.0.0
 
 cd ..
 rm -rf tmp
diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
index ccd65fb6b82..02c2583ee73 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
+++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
@@ -374,8 +374,6 @@ org.apache.flink.streaming.api.windowing.triggers.Trigger.onMerge(org.apache.fli
 org.apache.flink.streaming.api.windowing.triggers.Trigger.onProcessingTime(long, org.apache.flink.streaming.api.windowing.windows.Window, org.apache.flink.streaming.api.windowing.triggers.Trigger$TriggerContext): Argument leaf type org.apache.flink.streaming.api.windowing.triggers.Trigger$TriggerContext does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.streaming.api.windowing.triggers.Trigger.onProcessingTime(long, org.apache.flink.streaming.api.windowing.windows.Window, org.apache.flink.streaming.api.windowing.triggers.Trigger$TriggerContext): Returned leaf type org.apache.flink.streaming.api.windowing.triggers.TriggerResult does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.streaming.api.windowing.windows.TimeWindow.mergeWindows(java.util.Collection, org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner$MergeCallback): Argument leaf type org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner$MergeCallback does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
-org.apache.flink.streaming.connectors.cassandra.CassandraSink.setUidHash(java.lang.String): Returned leaf type org.apache.flink.streaming.connectors.cassandra.CassandraSink does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
-org.apache.flink.streaming.connectors.cassandra.CassandraSink.uid(java.lang.String): Returned leaf type org.apache.flink.streaming.connectors.cassandra.CassandraSink does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$KafkaTransactionState, java.lang.Object, org.apache.flink.streaming.api.functions.sink.SinkFunction$Context): Argument leaf type org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$KafkaTransactionState does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated w [...]
 org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition.dropLeaderData(java.util.List): Argument leaf type org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner.assign(org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle, int): Argument leaf type org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
@@ -386,4 +384,4 @@ org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue.getStrea
 org.apache.flink.streaming.runtime.streamrecord.LatencyMarker.getOperatorId(): Returned leaf type org.apache.flink.runtime.jobgraph.OperatorID does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware.setProcessingTimeService(org.apache.flink.streaming.runtime.tasks.ProcessingTimeService): Argument leaf type org.apache.flink.streaming.runtime.tasks.ProcessingTimeService does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.table.operations.QueryOperation.accept(org.apache.flink.table.operations.QueryOperationVisitor): Argument leaf type org.apache.flink.table.operations.QueryOperationVisitor does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
-org.apache.flink.types.parser.FieldParser.getErrorState(): Returned leaf type org.apache.flink.types.parser.FieldParser$ParseErrorState does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
\ No newline at end of file
+org.apache.flink.types.parser.FieldParser.getErrorState(): Returned leaf type org.apache.flink.types.parser.FieldParser$ParseErrorState does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
index 8ce680943af..f26ab80c5b2 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
+++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
@@ -90,6 +90,3 @@ Method <org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter.la
 Method <org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter.lambda$createInternalConverter$224afae6$9(java.lang.Object)> checks instanceof <oracle.jdbc.internal.OracleClob> in (OracleRowConverter.java:116)
 Method <org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter.lambda$createInternalConverter$224afae6$9(java.lang.Object)> checks instanceof <oracle.sql.CHAR> in (OracleRowConverter.java:115)
 Method <org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$1(java.lang.Class, org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter$JdbcDeserializationConverter, java.lang.Object)> calls method <org.postgresql.jdbc.PgArray.getArray()> in (PostgresRowConverter.java:90)
-Method <org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)> calls method <org.apache.flink.streaming.api.scala.DataStream.javaStream()> in (CassandraSink.java:205)
-Method <org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)> has generic parameter type <org.apache.flink.streaming.api.scala.DataStream<IN>> with type argument depending on <org.apache.flink.streaming.api.scala.DataStream> in (CassandraSink.java:0)
-Method <org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)> has parameter of type <org.apache.flink.streaming.api.scala.DataStream> in (CassandraSink.java:0)
diff --git a/flink-architecture-tests/flink-architecture-tests-production/pom.xml b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
index a50d5446dff..e203c020022 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml
+++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
@@ -134,11 +134,6 @@ under the License.
 			<artifactId>flink-connector-aws-base</artifactId>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
-		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-files</artifactId>
diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml
index d29a90c01d8..c79875d4f78 100644
--- a/flink-architecture-tests/pom.xml
+++ b/flink-architecture-tests/pom.xml
@@ -146,13 +146,6 @@ under the License.
 				<scope>test</scope>
 			</dependency>
 
-			<dependency>
-				<groupId>org.apache.flink</groupId>
-				<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
-				<version>${project.version}</version>
-				<scope>test</scope>
-			</dependency>
-
 			<dependency>
 				<groupId>org.apache.flink</groupId>
 				<artifactId>flink-connector-files</artifactId>
diff --git a/flink-connectors/flink-connector-cassandra/archunit-violations/b7279bb1-1eb7-40c0-931d-f6db7971d126 b/flink-connectors/flink-connector-cassandra/archunit-violations/b7279bb1-1eb7-40c0-931d-f6db7971d126
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201 b/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
deleted file mode 100644
index d4b3e56e32d..00000000000
--- a/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
+++ /dev/null
@@ -1,6 +0,0 @@
-org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
diff --git a/flink-connectors/flink-connector-cassandra/archunit-violations/stored.rules b/flink-connectors/flink-connector-cassandra/archunit-violations/stored.rules
deleted file mode 100644
index bcf4c415550..00000000000
--- a/flink-connectors/flink-connector-cassandra/archunit-violations/stored.rules
+++ /dev/null
@@ -1,4 +0,0 @@
-#
-#Tue Feb 22 12:16:49 CET 2022
-Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=b7279bb1-1eb7-40c0-931d-f6db7971d126
-ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=dc1ba6f4-3d84-498c-a085-e02ba5936201
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
deleted file mode 100644
index 4234e815280..00000000000
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ /dev/null
@@ -1,218 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-connectors</artifactId>
-		<version>1.17-SNAPSHOT</version>
-	</parent>
-
-	<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
-	<name>Flink : Connectors : Cassandra</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<!--driver 3.x works with 3.x and 4.x versions of Cassandra but driver 4.x is a complete refactoring with different API-->
-		<driver.version>3.11.2</driver.version>
-		<guava.version>19.0</guava.version>
-	</properties>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<!-- Run shade goal on package phase -->
-					<execution>
-						<id>shade-flink</id>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<!-- This is necessary because we bundle a subset of our dependencies,
-							and transitive dependencies of that subset should still be pulled into the user jar.-->
-							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
-							<artifactSet>
-								<includes>
-									<include>com.datastax.cassandra:cassandra-driver-core:shaded</include>
-									<include>com.datastax.cassandra:cassandra-driver-mapping</include>
-									<include>com.google.guava:guava</include>
-								</includes>
-							</artifactSet>
-							<relocations>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.cassandra.shaded.com.google</shadedPattern>
-									<excludes>
-										<exclude>com.google.protobuf.**</exclude>
-										<exclude>com.google.inject.**</exclude>
-									</excludes>
-								</relocation>
-							</relocations>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<!--Using datastax provided shaded driver which includes relocated netty-->
-		<!--https://docs.datastax.com/en/developer/java-driver/3.1/manual/shaded_jar/-->
-		<dependency>
-			<groupId>com.datastax.cassandra</groupId>
-			<artifactId>cassandra-driver-core</artifactId>
-			<classifier>shaded</classifier>
-			<version>${driver.version}</version>
-			<exclusions>
-				<!--Because the shaded JAR uses netty original POM, we still need to exclude netty explicitly-->
-				<exclusion>
-					<groupId>io.netty</groupId>
-					<artifactId>*</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>ch.qos.logback</groupId>
-					<artifactId>logback-classic</artifactId>
-				</exclusion>
-				<!--The next 3 are optional dependencies and are unused-->
-				<exclusion>
-					<groupId>com.github.jnr</groupId>
-					<artifactId>jnr-ffi</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.github.jnr</groupId>
-					<artifactId>jnr-posix</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.fasterxml.jackson.core</groupId>
-					<artifactId>jackson-databind</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>com.datastax.cassandra</groupId>
-			<artifactId>cassandra-driver-mapping</artifactId>
-			<version>${driver.version}</version>
-			<exclusions>
-				<!--use the shaded driver above-->
-				<exclusion>
-					<groupId>com.datastax.cassandra</groupId>
-					<artifactId>cassandra-driver-core</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>ch.qos.logback</groupId>
-					<artifactId>logback-classic</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-library</artifactId>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-		<!-- Table ecosystem -->
-		<!-- Projects depending on this project won't depend on flink-table-*. -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-api-java-bridge</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-			<optional>true</optional>
-		</dependency>
-
-		<!-- Test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.testcontainers</groupId>
-			<artifactId>cassandra</artifactId>
-			<scope>test</scope>
-		</dependency>
-
-		<!-- ArchUit test dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-architecture-tests-test</artifactId>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraColumnarOutputFormatBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraColumnarOutputFormatBase.java
deleted file mode 100644
index cb2346ec27e..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraColumnarOutputFormatBase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-
-import java.time.Duration;
-import java.util.concurrent.CompletionStage;
-
-/**
- * CassandraColumnarOutputFormatBase is the common abstract class for writing into Apache Cassandra
- * using column based output formats.
- *
- * @param <OUT> Type of the elements to write.
- */
-abstract class CassandraColumnarOutputFormatBase<OUT>
-        extends CassandraOutputFormatBase<OUT, ResultSet> {
-    private final String insertQuery;
-    private transient PreparedStatement prepared;
-
-    public CassandraColumnarOutputFormatBase(
-            String insertQuery,
-            ClusterBuilder builder,
-            int maxConcurrentRequests,
-            Duration maxConcurrentRequestsTimeout) {
-        super(builder, maxConcurrentRequests, maxConcurrentRequestsTimeout);
-        Preconditions.checkArgument(
-                !Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty");
-        this.insertQuery = insertQuery;
-    }
-
-    @Override
-    protected void postOpen() {
-        super.postOpen();
-        this.prepared = session.prepare(insertQuery);
-    }
-
-    @Override
-    protected CompletionStage<ResultSet> send(OUT record) {
-        Object[] fields = extractFields(record);
-        final ResultSetFuture result = session.executeAsync(prepared.bind(fields));
-        return listenableFutureToCompletableFuture(result);
-    }
-
-    protected abstract Object[] extractFields(OUT record);
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
deleted file mode 100644
index c1c5bfd3a35..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-
-import java.io.IOException;
-
-/**
- * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}.
- *
- * @param <OUT> type of Tuple
- */
-public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT> {
-
-    private static final long serialVersionUID = 3642323148032444264L;
-    private transient ResultSet resultSet;
-
-    public CassandraInputFormat(String query, ClusterBuilder builder) {
-        super(query, builder);
-    }
-
-    /**
-     * Opens a Session and executes the query.
-     *
-     * @param ignored because parameter is not parallelizable.
-     * @throws IOException
-     */
-    @Override
-    public void open(InputSplit ignored) throws IOException {
-        this.session = cluster.connect();
-        this.resultSet = session.execute(query);
-    }
-
-    @Override
-    public boolean reachedEnd() throws IOException {
-        return resultSet.isExhausted();
-    }
-
-    @Override
-    public OUT nextRecord(OUT reuse) throws IOException {
-        final Row item = resultSet.one();
-        for (int i = 0; i < reuse.getArity(); i++) {
-            reuse.setField(item.getObject(i), i);
-        }
-        return reuse;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
deleted file mode 100644
index 5b0964968f0..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for {@link RichInputFormat} to read data from Apache Cassandra and generate a custom
- * Cassandra annotated object.
- *
- * @param <OUT> type of inputClass
- */
-public abstract class CassandraInputFormatBase<OUT> extends RichInputFormat<OUT, InputSplit>
-        implements NonParallelInput {
-    private static final long serialVersionUID = -1519372881115104601L;
-    protected final Logger logger = LoggerFactory.getLogger(getClass());
-
-    protected final String query;
-    private final ClusterBuilder builder;
-
-    protected transient Cluster cluster;
-    protected transient Session session;
-
-    public CassandraInputFormatBase(String query, ClusterBuilder builder) {
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty");
-        Preconditions.checkNotNull(builder, "Builder cannot be null");
-
-        this.query = query;
-        this.builder = builder;
-    }
-
-    @Override
-    public void configure(Configuration parameters) {
-        this.cluster = builder.getCluster();
-    }
-
-    @Override
-    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-        return cachedStatistics;
-    }
-
-    @Override
-    public InputSplit[] createInputSplits(int minNumSplits) {
-        return new GenericInputSplit[] {new GenericInputSplit(0, 1)};
-    }
-
-    @Override
-    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
-        return new DefaultInputSplitAssigner(inputSplits);
-    }
-
-    /** Closes all resources used. */
-    @Override
-    public void close() {
-        try {
-            if (session != null) {
-                session.close();
-            }
-        } catch (Exception e) {
-            logger.error("Error while closing session.", e);
-        }
-
-        try {
-            if (cluster != null) {
-                cluster.close();
-            }
-        } catch (Exception e) {
-            logger.error("Error while closing cluster.", e);
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
deleted file mode 100644
index b0dab96c3a7..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-/**
- * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster.
- *
- * @param <OUT> Type of {@link Tuple} to write to Cassandra.
- * @deprecated Please use CassandraTupleOutputFormat instead.
- */
-@Deprecated
-public class CassandraOutputFormat<OUT extends Tuple> extends CassandraTupleOutputFormat<OUT> {
-
-    public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) {
-        super(insertQuery, builder);
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
deleted file mode 100644
index ca8663a6988..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.api.common.io.OutputFormatBase;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * CassandraOutputFormatBase is the common abstract class for writing into Apache Cassandra using
- * output formats.
- *
- * <p>In case of experiencing the following error: {@code Error while sending value.
- * com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query
- * at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write)},
- *
- * <p>it is recommended to increase the Cassandra write timeout to adapt to your workload in your
- * Cassandra cluster so that such timeout errors do not happen. For that you need to raise
- * write_request_timeout_in_ms conf parameter in your cassandra.yml. Indeed, This exception means
- * that Cassandra coordinator node (internal Cassandra) waited too long for an internal replication
- * (replication to another node and did not ack the write. It is not recommended to lower the
- * replication factor in your Cassandra cluster because it is mandatory that you do not loose data
- * in case of a Cassandra cluster failure. Waiting for a single replica for write acknowledge is the
- * minimum level for this guarantee in Cassandra.}
- *
- * @param <OUT> Type of the elements to write.
- */
-abstract class CassandraOutputFormatBase<OUT, V> extends OutputFormatBase<OUT, V> {
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormatBase.class);
-
-    private final ClusterBuilder builder;
-    private transient Cluster cluster;
-    protected transient Session session;
-
-    public CassandraOutputFormatBase(
-            ClusterBuilder builder,
-            int maxConcurrentRequests,
-            Duration maxConcurrentRequestsTimeout) {
-        super(maxConcurrentRequests, maxConcurrentRequestsTimeout);
-        Preconditions.checkNotNull(builder, "Builder cannot be null");
-        this.builder = builder;
-    }
-
-    /** Configure the connection to Cassandra. */
-    @Override
-    public void configure(Configuration parameters) {
-        this.cluster = builder.getCluster();
-    }
-
-    /** Opens a Session to Cassandra . */
-    @Override
-    protected void postOpen() {
-        this.session = cluster.connect();
-    }
-
-    /** Closes all resources used by Cassandra connection. */
-    @Override
-    protected void postClose() {
-        try {
-            if (session != null) {
-                session.close();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while closing session.", e);
-        }
-        try {
-            if (cluster != null) {
-                cluster.close();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while closing cluster.", e);
-        }
-    }
-
-    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
-            final ListenableFuture<T> listenableFuture) {
-        CompletableFuture<T> completable = new CompletableFuture<T>();
-        Futures.addCallback(listenableFuture, new CompletableFutureCallback<>(completable));
-        return completable;
-    }
-
-    private static class CompletableFutureCallback<T> implements FutureCallback<T> {
-
-        private final CompletableFuture<T> completableFuture;
-
-        public CompletableFutureCallback(CompletableFuture<T> completableFuture) {
-            this.completableFuture = completableFuture;
-        }
-
-        @Override
-        public void onSuccess(@Nullable T result) {
-            completableFuture.complete(result);
-        }
-
-        @Override
-        public void onFailure(Throwable throwable) {
-            completableFuture.completeExceptionally(throwable);
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
deleted file mode 100644
index e484e372417..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
-import org.apache.flink.util.Preconditions;
-
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.datastax.driver.mapping.Result;
-
-/**
- * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object.
- *
- * @param <OUT> type of inputClass
- */
-public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT> {
-
-    private static final long serialVersionUID = 1992091320180905115L;
-
-    private transient Result<OUT> resultSet;
-    private final MapperOptions mapperOptions;
-    private final Class<OUT> inputClass;
-
-    public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass) {
-        this(query, builder, inputClass, null);
-    }
-
-    public CassandraPojoInputFormat(
-            String query,
-            ClusterBuilder builder,
-            Class<OUT> inputClass,
-            MapperOptions mapperOptions) {
-        super(query, builder);
-        this.mapperOptions = mapperOptions;
-        this.inputClass = Preconditions.checkNotNull(inputClass, "InputClass cannot be null");
-    }
-
-    @Override
-    public void open(InputSplit split) {
-        this.session = cluster.connect();
-        MappingManager manager = new MappingManager(session);
-
-        Mapper<OUT> mapper = manager.mapper(inputClass);
-
-        if (mapperOptions != null) {
-            Mapper.Option[] optionsArray = mapperOptions.getMapperOptions();
-            if (optionsArray != null) {
-                mapper.setDefaultGetOptions(optionsArray);
-            }
-        }
-        this.resultSet = mapper.map(session.execute(query));
-    }
-
-    @Override
-    public boolean reachedEnd() {
-        return resultSet.isExhausted();
-    }
-
-    @Override
-    public OUT nextRecord(OUT reuse) {
-        return resultSet.one();
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java
deleted file mode 100644
index 6c3f754d501..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoOutputFormat.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
-import org.apache.flink.util.Preconditions;
-
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.time.Duration;
-import java.util.concurrent.CompletionStage;
-
-/**
- * OutputFormat to write data to Apache Cassandra and from a custom Cassandra annotated object.
- * Please read the recommendations in {@linkplain CassandraOutputFormatBase}.
- *
- * @param <OUT> type of outputClass
- */
-public class CassandraPojoOutputFormat<OUT> extends CassandraOutputFormatBase<OUT, Void> {
-
-    private static final long serialVersionUID = -1701885135103942460L;
-
-    private final MapperOptions mapperOptions;
-    private final Class<OUT> outputClass;
-    private transient Mapper<OUT> mapper;
-
-    public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass) {
-        this(builder, outputClass, null);
-    }
-
-    public CassandraPojoOutputFormat(
-            ClusterBuilder builder, Class<OUT> outputClass, MapperOptions mapperOptions) {
-        this(
-                builder,
-                outputClass,
-                mapperOptions,
-                Integer.MAX_VALUE,
-                Duration.ofMillis(Long.MAX_VALUE));
-    }
-
-    public CassandraPojoOutputFormat(
-            ClusterBuilder builder,
-            Class<OUT> outputClass,
-            MapperOptions mapperOptions,
-            int maxConcurrentRequests,
-            Duration maxConcurrentRequestsTimeout) {
-        super(builder, maxConcurrentRequests, maxConcurrentRequestsTimeout);
-        Preconditions.checkNotNull(outputClass, "OutputClass cannot be null");
-        this.mapperOptions = mapperOptions;
-        this.outputClass = outputClass;
-    }
-
-    /** Opens a Session to Cassandra and initializes the prepared statement. */
-    @Override
-    protected void postOpen() {
-        super.postOpen();
-        MappingManager mappingManager = new MappingManager(session);
-        this.mapper = mappingManager.mapper(outputClass);
-        if (mapperOptions != null) {
-            Mapper.Option[] optionsArray = mapperOptions.getMapperOptions();
-            if (optionsArray != null) {
-                mapper.setDefaultSaveOptions(optionsArray);
-            }
-        }
-    }
-
-    @Override
-    protected CompletionStage<Void> send(OUT record) {
-        final ListenableFuture<Void> result = mapper.saveAsync(record);
-        return listenableFutureToCompletableFuture(result);
-    }
-
-    /** Closes all resources used. */
-    @Override
-    protected void postClose() {
-        super.postClose();
-        mapper = null;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
deleted file mode 100644
index 0e7a5712211..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.types.Row;
-
-import java.time.Duration;
-
-/**
- * OutputFormat to write Flink {@link Row}s into a Cassandra cluster. * Please read the
- * recommendations in {@linkplain CassandraOutputFormatBase}.
- */
-public class CassandraRowOutputFormat extends CassandraColumnarOutputFormatBase<Row> {
-
-    public CassandraRowOutputFormat(String insertQuery, ClusterBuilder builder) {
-        this(insertQuery, builder, Integer.MAX_VALUE, Duration.ofMillis(Long.MAX_VALUE));
-    }
-
-    public CassandraRowOutputFormat(
-            String insertQuery,
-            ClusterBuilder builder,
-            int maxConcurrentRequests,
-            Duration maxConcurrentRequestsTimeout) {
-        super(insertQuery, builder, maxConcurrentRequests, maxConcurrentRequestsTimeout);
-    }
-
-    @Override
-    protected Object[] extractFields(Row record) {
-
-        Object[] fields = new Object[record.getArity()];
-        for (int i = 0; i < fields.length; i++) {
-            fields[i] = record.getField(i);
-        }
-        return fields;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
deleted file mode 100644
index 9c038c5be6f..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import java.time.Duration;
-
-/**
- * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster. Please read the
- * recommendations in {@linkplain CassandraOutputFormatBase}.
- *
- * @param <OUT> Type of {@link Tuple} to write to Cassandra.
- */
-public class CassandraTupleOutputFormat<OUT extends Tuple>
-        extends CassandraColumnarOutputFormatBase<OUT> {
-
-    public CassandraTupleOutputFormat(String insertQuery, ClusterBuilder builder) {
-        this(insertQuery, builder, Integer.MAX_VALUE, Duration.ofMillis(Long.MAX_VALUE));
-    }
-
-    public CassandraTupleOutputFormat(
-            String insertQuery,
-            ClusterBuilder builder,
-            int maxConcurrentRequests,
-            Duration maxConcurrentRequestsTimeout) {
-        super(insertQuery, builder, maxConcurrentRequests, maxConcurrentRequestsTimeout);
-    }
-
-    @Override
-    protected Object[] extractFields(OUT record) {
-        Object[] fields = new Object[record.getArity()];
-        for (int i = 0; i < fields.length; i++) {
-            fields[i] = record.getField(i);
-        }
-        return fields;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
deleted file mode 100644
index 6cfe7ae40a0..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.configuration.Configuration;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * Abstract sink to write tuple-like values into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
-    private final String insertQuery;
-    private transient PreparedStatement ps;
-    private final boolean ignoreNullFields;
-
-    public AbstractCassandraTupleSink(
-            String insertQuery,
-            ClusterBuilder builder,
-            CassandraSinkBaseConfig config,
-            CassandraFailureHandler failureHandler) {
-        super(builder, config, failureHandler);
-        this.insertQuery = insertQuery;
-        this.ignoreNullFields = config.getIgnoreNullFields();
-    }
-
-    @Override
-    public void open(Configuration configuration) {
-        super.open(configuration);
-        this.ps = session.prepare(insertQuery);
-    }
-
-    @Override
-    public ListenableFuture<ResultSet> send(IN value) {
-        Object[] fields = extract(value);
-        return session.executeAsync(bind(fields));
-    }
-
-    private BoundStatement bind(Object[] fields) {
-        BoundStatement bs = ps.bind(fields);
-        if (ignoreNullFields) {
-            for (int i = 0; i < fields.length; i++) {
-                if (fields[i] == null) {
-                    bs.unset(i);
-                }
-            }
-        }
-        return bs;
-    }
-
-    protected abstract Object[] extract(IN record);
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
deleted file mode 100644
index 5a42a7ce721..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.utils.TableConnectorUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Properties;
-
-/** An {@link AppendStreamTableSink} to write an append stream Table to a Cassandra table. */
-public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
-
-    private final ClusterBuilder builder;
-    private final String cql;
-    private String[] fieldNames;
-    private TypeInformation[] fieldTypes;
-    private final Properties properties;
-
-    public CassandraAppendTableSink(ClusterBuilder builder, String cql) {
-        this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null.");
-        this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null.");
-        this.properties = new Properties();
-    }
-
-    public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties) {
-        this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null.");
-        this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null.");
-        this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
-    }
-
-    @Override
-    public TypeInformation<Row> getOutputType() {
-        return new RowTypeInfo(fieldTypes);
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return this.fieldNames;
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return this.fieldTypes;
-    }
-
-    @Override
-    public CassandraAppendTableSink configure(
-            String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-        CassandraAppendTableSink cassandraTableSink =
-                new CassandraAppendTableSink(this.builder, this.cql, this.properties);
-        cassandraTableSink.fieldNames =
-                Preconditions.checkNotNull(fieldNames, "Field names must not be null.");
-        cassandraTableSink.fieldTypes =
-                Preconditions.checkNotNull(fieldTypes, "Field types must not be null.");
-        Preconditions.checkArgument(
-                fieldNames.length == fieldTypes.length,
-                "Number of provided field names and types does not match.");
-        return cassandraTableSink;
-    }
-
-    @Override
-    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
-        if (!(dataStream.getType() instanceof RowTypeInfo)) {
-            throw new TableException(
-                    "No support for the type of the given DataStream: " + dataStream.getType());
-        }
-
-        CassandraRowSink sink =
-                new CassandraRowSink(
-                        dataStream.getType().getArity(),
-                        cql,
-                        builder,
-                        CassandraSinkBaseConfig.newBuilder().build(),
-                        new NoOpCassandraFailureHandler());
-
-        return dataStream
-                .addSink(sink)
-                .setParallelism(dataStream.getParallelism())
-                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
deleted file mode 100644
index 5188e1f14e1..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * CheckpointCommitter that saves information about completed checkpoints within a separate table in
- * a cassandra database.
- *
- * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
- */
-public class CassandraCommitter extends CheckpointCommitter {
-
-    private static final long serialVersionUID = 1L;
-
-    private final ClusterBuilder builder;
-    private transient Cluster cluster;
-    private transient Session session;
-
-    private String keySpace = "flink_auxiliary";
-    private String table = "checkpoints_";
-
-    /**
-     * A cache of the last committed checkpoint ids per subtask index. This is used to avoid
-     * redundant round-trips to Cassandra (see {@link #isCheckpointCommitted(int, long)}.
-     */
-    private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<>();
-
-    public CassandraCommitter(ClusterBuilder builder) {
-        this.builder = builder;
-        ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-    }
-
-    public CassandraCommitter(ClusterBuilder builder, String keySpace) {
-        this(builder);
-        this.keySpace = keySpace;
-    }
-
-    /** Internally used to set the job ID after instantiation. */
-    public void setJobId(String id) throws Exception {
-        super.setJobId(id);
-        table += id;
-    }
-
-    /**
-     * Generates the necessary tables to store information.
-     *
-     * @throws Exception
-     */
-    @Override
-    public void createResource() throws Exception {
-        cluster = builder.getCluster();
-        session = cluster.connect();
-
-        session.execute(
-                String.format(
-                        "CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};",
-                        keySpace));
-        session.execute(
-                String.format(
-                        "CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));",
-                        keySpace, table));
-
-        try {
-            session.close();
-        } catch (Exception e) {
-            LOG.error("Error while closing session.", e);
-        }
-        try {
-            cluster.close();
-        } catch (Exception e) {
-            LOG.error("Error while closing cluster.", e);
-        }
-    }
-
-    @Override
-    public void open() throws Exception {
-        if (builder == null) {
-            throw new RuntimeException("No ClusterBuilder was set.");
-        }
-        cluster = builder.getCluster();
-        session = cluster.connect();
-    }
-
-    @Override
-    public void close() throws Exception {
-        this.lastCommittedCheckpoints.clear();
-        try {
-            session.close();
-        } catch (Exception e) {
-            LOG.error("Error while closing session.", e);
-        }
-        try {
-            cluster.close();
-        } catch (Exception e) {
-            LOG.error("Error while closing cluster.", e);
-        }
-    }
-
-    @Override
-    public void commitCheckpoint(int subtaskIdx, long checkpointId) {
-        String statement =
-                String.format(
-                        "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;",
-                        keySpace, table, checkpointId, operatorId, subtaskIdx);
-
-        session.execute(statement);
-        lastCommittedCheckpoints.put(subtaskIdx, checkpointId);
-    }
-
-    @Override
-    public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) {
-        // Pending checkpointed buffers are committed in ascending order of their
-        // checkpoint id. This way we can tell if a checkpointed buffer was committed
-        // just by asking the third-party storage system for the last checkpoint id
-        // committed by the specified subtask.
-
-        Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx);
-        if (lastCommittedCheckpoint == null) {
-            String statement =
-                    String.format(
-                            "SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;",
-                            keySpace, table, operatorId, subtaskIdx);
-
-            Iterator<Row> resultIt = session.execute(statement).iterator();
-            if (resultIt.hasNext()) {
-                lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id");
-                lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint);
-            }
-        }
-        return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
deleted file mode 100644
index 9ceaaf40a30..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraFailureHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * An implementation of {@link CassandraFailureHandler} is provided by the user to define how {@link
- * Throwable Throwable} should be handled, e.g. dropping them if the failure is only temporary.
- *
- * <p>Example:
- *
- * <pre>{@code
- * 	private static class ExampleFailureHandler implements CassandraFailureHandler {
- *
- * 		@Override
- * 		void onFailure(Throwable failure) throws IOException {
- * 			if (ExceptionUtils.findThrowable(failure, WriteTimeoutException.class).isPresent()) {
- * 				// drop exception
- * 			} else {
- * 				// for all other failures, fail the sink;
- * 				// here the failure is simply rethrown, but users can also choose to throw custom exceptions
- * 				throw failure;
- * 			}
- * 		}
- * 	}
- *
- * }</pre>
- *
- * <p>The above example will let the sink ignore the WriteTimeoutException, without failing the
- * sink. For all other failures, the sink will fail.
- */
-@PublicEvolving
-public interface CassandraFailureHandler extends Serializable {
-
-    /**
-     * Handle a failed {@link Throwable}.
-     *
-     * @param failure the cause of failure
-     * @throws IOException if the sink should fail on this failure, the implementation should
-     *     rethrow the throwable or a custom one
-     */
-    void onFailure(Throwable failure) throws IOException;
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
deleted file mode 100644
index 8a03c395e60..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.configuration.Configuration;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import javax.annotation.Nullable;
-
-/**
- * Flink Sink to save data into a Cassandra cluster using <a
- * href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
- * which it uses annotations from <a
- * href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
- * com.datastax.driver.mapping.annotations</a>. Please read the recommendations in {@linkplain
- * CassandraSinkBase}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
-
-    private static final long serialVersionUID = 1L;
-
-    protected final Class<IN> clazz;
-    private final MapperOptions options;
-    private final String keyspace;
-    protected transient Mapper<IN> mapper;
-    protected transient MappingManager mappingManager;
-
-    /**
-     * The main constructor for creating CassandraPojoSink.
-     *
-     * @param clazz Class instance
-     */
-    public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
-        this(clazz, builder, null, null);
-    }
-
-    public CassandraPojoSink(
-            Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options) {
-        this(clazz, builder, options, null);
-    }
-
-    public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace) {
-        this(clazz, builder, null, keyspace);
-    }
-
-    public CassandraPojoSink(
-            Class<IN> clazz,
-            ClusterBuilder builder,
-            @Nullable MapperOptions options,
-            String keyspace) {
-        this(clazz, builder, options, keyspace, CassandraSinkBaseConfig.newBuilder().build());
-    }
-
-    CassandraPojoSink(
-            Class<IN> clazz,
-            ClusterBuilder builder,
-            @Nullable MapperOptions options,
-            String keyspace,
-            CassandraSinkBaseConfig config) {
-        this(clazz, builder, options, keyspace, config, new NoOpCassandraFailureHandler());
-    }
-
-    CassandraPojoSink(
-            Class<IN> clazz,
-            ClusterBuilder builder,
-            @Nullable MapperOptions options,
-            String keyspace,
-            CassandraSinkBaseConfig config,
-            CassandraFailureHandler failureHandler) {
-        super(builder, config, failureHandler);
-        this.clazz = clazz;
-        this.options = options;
-        this.keyspace = keyspace;
-    }
-
-    @Override
-    public void open(Configuration configuration) {
-        super.open(configuration);
-        try {
-            this.mappingManager = new MappingManager(session);
-            this.mapper = mappingManager.mapper(clazz);
-            if (options != null) {
-                Mapper.Option[] optionsArray = options.getMapperOptions();
-                if (optionsArray != null) {
-                    this.mapper.setDefaultSaveOptions(optionsArray);
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e);
-        }
-    }
-
-    @Override
-    protected Session createSession() {
-        return cluster.connect(keyspace);
-    }
-
-    @Override
-    public ListenableFuture<ResultSet> send(IN value) {
-        return session.executeAsync(mapper.saveQuery(value));
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
deleted file mode 100644
index 6d474873618..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.types.Row;
-
-/**
- * A SinkFunction to write Row records into a Cassandra table. Please read the recommendations in
- * {@linkplain CassandraSinkBase}.
- */
-public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
-
-    private final int rowArity;
-
-    public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder) {
-        this(rowArity, insertQuery, builder, CassandraSinkBaseConfig.newBuilder().build());
-    }
-
-    CassandraRowSink(
-            int rowArity,
-            String insertQuery,
-            ClusterBuilder builder,
-            CassandraSinkBaseConfig config) {
-        this(rowArity, insertQuery, builder, config, new NoOpCassandraFailureHandler());
-    }
-
-    CassandraRowSink(
-            int rowArity,
-            String insertQuery,
-            ClusterBuilder builder,
-            CassandraSinkBaseConfig config,
-            CassandraFailureHandler failureHandler) {
-        super(insertQuery, builder, config, failureHandler);
-        this.rowArity = rowArity;
-    }
-
-    @Override
-    protected Object[] extract(Row record) {
-        Object[] al = new Object[rowArity];
-        for (int i = 0; i < rowArity; i++) {
-            al[i] = record.getField(i);
-        }
-        return al;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
deleted file mode 100644
index 3a2ecd8dfbf..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
-import org.apache.flink.types.Row;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Sink that emits its input elements into a Cassandra table. This sink stores incoming records
- * within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to
- * Cassandra if a checkpoint is completed.
- */
-public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> {
-    private static final long serialVersionUID = 1L;
-
-    protected transient Cluster cluster;
-    protected transient Session session;
-
-    private final String insertQuery;
-    private transient PreparedStatement preparedStatement;
-
-    private ClusterBuilder builder;
-
-    private int arity;
-    private transient Object[] fields;
-
-    protected CassandraRowWriteAheadSink(
-            String insertQuery,
-            TypeSerializer<Row> serializer,
-            ClusterBuilder builder,
-            CheckpointCommitter committer)
-            throws Exception {
-        super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
-        this.insertQuery = insertQuery;
-        this.builder = builder;
-        ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-    }
-
-    public void open() throws Exception {
-        super.open();
-        if (!getRuntimeContext().isCheckpointingEnabled()) {
-            throw new IllegalStateException(
-                    "The write-ahead log requires checkpointing to be enabled.");
-        }
-        cluster = builder.getCluster();
-        session = cluster.connect();
-        preparedStatement = session.prepare(insertQuery);
-
-        arity = ((RowSerializer) serializer).getArity();
-        fields = new Object[arity];
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        try {
-            if (session != null) {
-                session.close();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while closing session.", e);
-        }
-        try {
-            if (cluster != null) {
-                cluster.close();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while closing cluster.", e);
-        }
-    }
-
-    @Override
-    protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp)
-            throws Exception {
-        final AtomicInteger updatesCount = new AtomicInteger(0);
-        final AtomicInteger updatesConfirmed = new AtomicInteger(0);
-
-        final AtomicReference<Throwable> exception = new AtomicReference<>();
-
-        FutureCallback<ResultSet> callback =
-                new FutureCallback<ResultSet>() {
-                    @Override
-                    public void onSuccess(ResultSet resultSet) {
-                        updatesConfirmed.incrementAndGet();
-                        if (updatesCount.get() > 0) { // only set if all updates have been sent
-                            if (updatesCount.get() == updatesConfirmed.get()) {
-                                synchronized (updatesConfirmed) {
-                                    updatesConfirmed.notifyAll();
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        if (exception.compareAndSet(null, throwable)) {
-                            LOG.error("Error while sending value.", throwable);
-                            synchronized (updatesConfirmed) {
-                                updatesConfirmed.notifyAll();
-                            }
-                        }
-                    }
-                };
-
-        // set values for prepared statement
-        int updatesSent = 0;
-        for (Row value : values) {
-            for (int x = 0; x < arity; x++) {
-                fields[x] = value.getField(x);
-            }
-            // insert values and send to cassandra
-            BoundStatement s = preparedStatement.bind(fields);
-            s.setDefaultTimestamp(timestamp);
-            ResultSetFuture result = session.executeAsync(s);
-            updatesSent++;
-            if (result != null) {
-                // add callback to detect errors
-                Futures.addCallback(result, callback);
-            }
-        }
-        updatesCount.set(updatesSent);
-
-        synchronized (updatesConfirmed) {
-            while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
-                updatesConfirmed.wait();
-            }
-        }
-
-        if (exception.get() != null) {
-            LOG.warn("Sending a value failed.", exception.get());
-            return false;
-        } else {
-            return true;
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
deleted file mode 100644
index 43a1473edb2..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import scala.Product;
-
-/**
- * Sink to write scala tuples and case classes into a Cassandra cluster. Please read the
- * recommendations in {@linkplain CassandraSinkBase}.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Product}
- */
-public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
-    public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder) {
-        this(insertQuery, builder, CassandraSinkBaseConfig.newBuilder().build());
-    }
-
-    CassandraScalaProductSink(
-            String insertQuery, ClusterBuilder builder, CassandraSinkBaseConfig config) {
-        this(insertQuery, builder, config, new NoOpCassandraFailureHandler());
-    }
-
-    CassandraScalaProductSink(
-            String insertQuery,
-            ClusterBuilder builder,
-            CassandraSinkBaseConfig config,
-            CassandraFailureHandler failureHandler) {
-        super(insertQuery, builder, config, failureHandler);
-    }
-
-    @Override
-    protected Object[] extract(IN record) {
-        Object[] al = new Object[record.productArity()];
-        for (int i = 0; i < record.productArity(); i++) {
-            al[i] = record.productElement(i);
-        }
-        return al;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
deleted file mode 100644
index bf44f0a6eca..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.types.Row;
-
-import com.datastax.driver.core.Cluster;
-
-import java.time.Duration;
-
-import scala.Product;
-
-/**
- * This class wraps different Cassandra sink implementations to provide a common interface for all
- * of them.
- *
- * @param <IN> input type
- */
-public class CassandraSink<IN> {
-    private final boolean useDataStreamSink;
-    private DataStreamSink<IN> sink1;
-    private SingleOutputStreamOperator<IN> sink2;
-
-    private CassandraSink(DataStreamSink<IN> sink) {
-        sink1 = sink;
-        useDataStreamSink = true;
-    }
-
-    private CassandraSink(SingleOutputStreamOperator<IN> sink) {
-        sink2 = sink;
-        useDataStreamSink = false;
-    }
-
-    private LegacySinkTransformation<IN> getSinkTransformation() {
-        return sink1.getLegacyTransformation();
-    }
-
-    private Transformation<IN> getTransformation() {
-        return sink2.getTransformation();
-    }
-
-    /**
-     * Sets the name of this sink. This name is used by the visualization and logging during
-     * runtime.
-     *
-     * @return The named sink.
-     */
-    public CassandraSink<IN> name(String name) {
-        if (useDataStreamSink) {
-            getSinkTransformation().setName(name);
-        } else {
-            getTransformation().setName(name);
-        }
-        return this;
-    }
-
-    /**
-     * Sets an ID for this operator.
-     *
-     * <p>The specified ID is used to assign the same operator ID across job submissions (for
-     * example when starting a job from a savepoint).
-     *
-     * <p><strong>Important</strong>: this ID needs to be unique per transformation and job.
-     * Otherwise, job submission will fail.
-     *
-     * @param uid The unique user-specified ID of this transformation.
-     * @return The operator with the specified ID.
-     */
-    @PublicEvolving
-    public CassandraSink<IN> uid(String uid) {
-        if (useDataStreamSink) {
-            getSinkTransformation().setUid(uid);
-        } else {
-            getTransformation().setUid(uid);
-        }
-        return this;
-    }
-
-    /**
-     * Sets an user provided hash for this operator. This will be used AS IS the create the
-     * JobVertexID.
-     *
-     * <p>The user provided hash is an alternative to the generated hashes, that is considered when
-     * identifying an operator through the default hash mechanics fails (e.g. because of changes
-     * between Flink versions).
-     *
-     * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting.
-     * The provided hash needs to be unique per transformation and job. Otherwise, job submission
-     * will fail. Furthermore, you cannot assign user-specified hash to intermediate nodes in an
-     * operator chain and trying so will let your job fail.
-     *
-     * <p>A use case for this is in migration between Flink versions or changing the jobs in a way
-     * that changes the automatically generated hashes. In this case, providing the previous hashes
-     * directly through this method (e.g. obtained from old logs) can help to reestablish a lost
-     * mapping from states to their target operator.
-     *
-     * @param uidHash The user provided hash for this operator. This will become the JobVertexID,
-     *     which is shown in the logs and web ui.
-     * @return The operator with the user provided hash.
-     */
-    @PublicEvolving
-    public CassandraSink<IN> setUidHash(String uidHash) {
-        if (useDataStreamSink) {
-            getSinkTransformation().setUidHash(uidHash);
-        } else {
-            getTransformation().setUidHash(uidHash);
-        }
-        return this;
-    }
-
-    /**
-     * Sets the parallelism for this sink. The degree must be higher than zero.
-     *
-     * @param parallelism The parallelism for this sink.
-     * @return The sink with set parallelism.
-     */
-    public CassandraSink<IN> setParallelism(int parallelism) {
-        if (useDataStreamSink) {
-            sink1.setParallelism(parallelism);
-        } else {
-            sink2.setParallelism(parallelism);
-        }
-        return this;
-    }
-
-    /**
-     * Turns off chaining for this operator so thread co-location will not be used as an
-     * optimization.
-     *
-     * <p>Chaining can be turned off for the whole job by {@link
-     * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
-     * however it is not advised for performance considerations.
-     *
-     * @return The sink with chaining disabled
-     */
-    public CassandraSink<IN> disableChaining() {
-        if (useDataStreamSink) {
-            sink1.disableChaining();
-        } else {
-            sink2.disableChaining();
-        }
-        return this;
-    }
-
-    /**
-     * Sets the slot sharing group of this operation. Parallel instances of operations that are in
-     * the same slot sharing group will be co-located in the same TaskManager slot, if possible.
-     *
-     * <p>Operations inherit the slot sharing group of input operations if all input operations are
-     * in the same slot sharing group and no slot sharing group was explicitly specified.
-     *
-     * <p>Initially an operation is in the default slot sharing group. An operation can be put into
-     * the default group explicitly by setting the slot sharing group to {@code "default"}.
-     *
-     * @param slotSharingGroup The slot sharing group name.
-     */
-    public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
-        if (useDataStreamSink) {
-            getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
-        } else {
-            getTransformation().setSlotSharingGroup(slotSharingGroup);
-        }
-        return this;
-    }
-
-    /**
-     * Writes a DataStream into a Cassandra database.
-     *
-     * @param input input DataStream
-     * @param <IN> input type
-     * @return CassandraSinkBuilder, to further configure the sink
-     */
-    public static <IN> CassandraSinkBuilder<IN> addSink(
-            org.apache.flink.streaming.api.scala.DataStream<IN> input) {
-        return addSink(input.javaStream());
-    }
-
-    /**
-     * Writes a DataStream into a Cassandra database.
-     *
-     * @param input input DataStream
-     * @param <IN> input type
-     * @return CassandraSinkBuilder, to further configure the sink
-     */
-    public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
-        TypeInformation<IN> typeInfo = input.getType();
-        if (typeInfo instanceof TupleTypeInfo) {
-            DataStream<Tuple> tupleInput = (DataStream<Tuple>) input;
-            return (CassandraSinkBuilder<IN>)
-                    new CassandraTupleSinkBuilder<>(
-                            tupleInput,
-                            tupleInput.getType(),
-                            tupleInput
-                                    .getType()
-                                    .createSerializer(
-                                            tupleInput.getExecutionEnvironment().getConfig()));
-        }
-        if (typeInfo instanceof RowTypeInfo) {
-            DataStream<Row> rowInput = (DataStream<Row>) input;
-            return (CassandraSinkBuilder<IN>)
-                    new CassandraRowSinkBuilder(
-                            rowInput,
-                            rowInput.getType(),
-                            rowInput.getType()
-                                    .createSerializer(
-                                            rowInput.getExecutionEnvironment().getConfig()));
-        }
-        if (typeInfo instanceof PojoTypeInfo) {
-            return new CassandraPojoSinkBuilder<>(
-                    input,
-                    input.getType(),
-                    input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
-        }
-        if (typeInfo instanceof CaseClassTypeInfo) {
-            DataStream<Product> productInput = (DataStream<Product>) input;
-            return (CassandraSinkBuilder<IN>)
-                    new CassandraScalaProductSinkBuilder<>(
-                            productInput,
-                            productInput.getType(),
-                            productInput
-                                    .getType()
-                                    .createSerializer(input.getExecutionEnvironment().getConfig()));
-        }
-        throw new IllegalArgumentException(
-                "No support for the type of the given DataStream: " + input.getType());
-    }
-
-    /**
-     * Builder for a {@link CassandraSink}.
-     *
-     * @param <IN>
-     */
-    public abstract static class CassandraSinkBuilder<IN> {
-        protected final DataStream<IN> input;
-        protected final TypeSerializer<IN> serializer;
-        protected final TypeInformation<IN> typeInfo;
-        protected final CassandraSinkBaseConfig.Builder configBuilder;
-        protected ClusterBuilder builder;
-        protected String keyspace;
-        protected MapperOptions mapperOptions;
-        protected String query;
-        protected CheckpointCommitter committer;
-        protected boolean isWriteAheadLogEnabled;
-        protected CassandraFailureHandler failureHandler;
-
-        public CassandraSinkBuilder(
-                DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-            this.input = input;
-            this.typeInfo = typeInfo;
-            this.serializer = serializer;
-            this.configBuilder = CassandraSinkBaseConfig.newBuilder();
-        }
-
-        /**
-         * Sets the query that is to be executed for every record.
-         *
-         * @param query query to use
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setQuery(String query) {
-            this.query = query;
-            return this;
-        }
-
-        /**
-         * Sets the keyspace to be used.
-         *
-         * @param keyspace keyspace to use
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace) {
-            this.keyspace = keyspace;
-            return this;
-        }
-
-        /**
-         * Sets the cassandra host to connect to.
-         *
-         * @param host host to connect to
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setHost(String host) {
-            return setHost(host, 9042);
-        }
-
-        /**
-         * Sets the cassandra host/port to connect to.
-         *
-         * @param host host to connect to
-         * @param port port to connect to
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
-            if (this.builder != null) {
-                throw new IllegalArgumentException(
-                        "Builder was already set. You must use either setHost() or setClusterBuilder().");
-            }
-            this.builder =
-                    new ClusterBuilder() {
-                        @Override
-                        protected Cluster buildCluster(Cluster.Builder builder) {
-                            return builder.addContactPoint(host).withPort(port).build();
-                        }
-                    };
-            return this;
-        }
-
-        /**
-         * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the
-         * connection to cassandra.
-         *
-         * @param builder ClusterBuilder to configure the connection to cassandra
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
-            if (this.builder != null) {
-                throw new IllegalArgumentException(
-                        "Builder was already set. You must use either setHost() or setClusterBuilder().");
-            }
-            this.builder = builder;
-            return this;
-        }
-
-        /**
-         * Enables the write-ahead log, which allows exactly-once processing for non-deterministic
-         * algorithms that use idempotent updates.
-         *
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> enableWriteAheadLog() {
-            this.isWriteAheadLogEnabled = true;
-            return this;
-        }
-
-        /**
-         * Enables the write-ahead log, which allows exactly-once processing for non-deterministic
-         * algorithms that use idempotent updates.
-         *
-         * @param committer CheckpointCommitter, that stores information about completed checkpoints
-         *     in an external resource. By default this information is stored within a separate
-         *     table within Cassandra.
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
-            this.isWriteAheadLogEnabled = true;
-            this.committer = committer;
-            return this;
-        }
-
-        /**
-         * Sets the mapper options for this sink. The mapper options are used to configure the
-         * DataStax {@link com.datastax.driver.mapping.Mapper} when writing POJOs.
-         *
-         * <p>This call has no effect if the input {@link DataStream} for this sink does not contain
-         * POJOs.
-         *
-         * @param options MapperOptions, that return an array of options that are used to configure
-         *     the DataStax mapper.
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options) {
-            this.mapperOptions = options;
-            return this;
-        }
-
-        /**
-         * Sets the failure handler for this sink. The failure handler is used to provide custom
-         * error handling.
-         *
-         * @param failureHandler CassandraFailureHandler, that handles any Throwable error.
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler) {
-            this.failureHandler = failureHandler;
-            return this;
-        }
-
-        /**
-         * Sets the maximum allowed number of concurrent requests for this sink.
-         *
-         * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is
-         * called.
-         *
-         * @param maxConcurrentRequests maximum number of concurrent requests allowed
-         * @param timeout timeout duration when acquiring a permit to execute
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setMaxConcurrentRequests(
-                int maxConcurrentRequests, Duration timeout) {
-            this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
-            this.configBuilder.setMaxConcurrentRequestsTimeout(timeout);
-            return this;
-        }
-
-        /**
-         * Sets the maximum allowed number of concurrent requests for this sink.
-         *
-         * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is
-         * called.
-         *
-         * @param maxConcurrentRequests maximum number of concurrent requests allowed
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests) {
-            this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
-            return this;
-        }
-
-        /**
-         * Enables ignoring null values, treats null values as unset and avoids writing null fields
-         * and creating tombstones.
-         *
-         * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is
-         * called.
-         *
-         * @return this builder
-         */
-        public CassandraSinkBuilder<IN> enableIgnoreNullFields() {
-            this.configBuilder.setIgnoreNullFields(true);
-            return this;
-        }
-
-        /**
-         * Finalizes the configuration of this sink.
-         *
-         * @return finalized sink
-         * @throws Exception
-         */
-        public CassandraSink<IN> build() throws Exception {
-            sanityCheck();
-            if (failureHandler == null) {
-                failureHandler = new NoOpCassandraFailureHandler();
-            }
-            return isWriteAheadLogEnabled ? createWriteAheadSink() : createSink();
-        }
-
-        protected abstract CassandraSink<IN> createSink() throws Exception;
-
-        protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;
-
-        protected void sanityCheck() {
-            if (builder == null) {
-                throw new IllegalArgumentException(
-                        "Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
-            }
-        }
-    }
-
-    /**
-     * Builder for a {@link CassandraTupleSink}.
-     *
-     * @param <IN>
-     */
-    public static class CassandraTupleSinkBuilder<IN extends Tuple>
-            extends CassandraSinkBuilder<IN> {
-        public CassandraTupleSinkBuilder(
-                DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-            super(input, typeInfo, serializer);
-        }
-
-        @Override
-        protected void sanityCheck() {
-            super.sanityCheck();
-            if (query == null || query.length() == 0) {
-                throw new IllegalArgumentException("Query must not be null or empty.");
-            }
-            if (keyspace != null) {
-                throw new IllegalArgumentException(
-                        "Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
-            }
-        }
-
-        @Override
-        public CassandraSink<IN> createSink() throws Exception {
-            final CassandraTupleSink<IN> sink =
-                    new CassandraTupleSink<>(query, builder, configBuilder.build(), failureHandler);
-            return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
-        }
-
-        @Override
-        protected CassandraSink<IN> createWriteAheadSink() throws Exception {
-            return committer == null
-                    ? new CassandraSink<>(
-                            input.transform(
-                                    "Cassandra Sink",
-                                    null,
-                                    new CassandraTupleWriteAheadSink<>(
-                                            query,
-                                            serializer,
-                                            builder,
-                                            new CassandraCommitter(builder))))
-                    : new CassandraSink<>(
-                            input.transform(
-                                    "Cassandra Sink",
-                                    null,
-                                    new CassandraTupleWriteAheadSink<>(
-                                            query, serializer, builder, committer)));
-        }
-    }
-
-    /** Builder for a {@link CassandraRowSink}. */
-    public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {
-        public CassandraRowSinkBuilder(
-                DataStream<Row> input,
-                TypeInformation<Row> typeInfo,
-                TypeSerializer<Row> serializer) {
-            super(input, typeInfo, serializer);
-        }
-
-        @Override
-        protected void sanityCheck() {
-            super.sanityCheck();
-            if (query == null || query.length() == 0) {
-                throw new IllegalArgumentException("Query must not be null or empty.");
-            }
-            if (keyspace != null) {
-                throw new IllegalArgumentException(
-                        "Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
-            }
-        }
-
-        @Override
-        protected CassandraSink<Row> createSink() throws Exception {
-            final CassandraRowSink sink =
-                    new CassandraRowSink(
-                            typeInfo.getArity(),
-                            query,
-                            builder,
-                            configBuilder.build(),
-                            failureHandler);
-            return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
-        }
-
-        @Override
-        protected CassandraSink<Row> createWriteAheadSink() throws Exception {
-            return committer == null
-                    ? new CassandraSink<>(
-                            input.transform(
-                                    "Cassandra Sink",
-                                    null,
-                                    new CassandraRowWriteAheadSink(
-                                            query,
-                                            serializer,
-                                            builder,
-                                            new CassandraCommitter(builder))))
-                    : new CassandraSink<>(
-                            input.transform(
-                                    "Cassandra Sink",
-                                    null,
-                                    new CassandraRowWriteAheadSink(
-                                            query, serializer, builder, committer)));
-        }
-    }
-
-    /**
-     * Builder for a {@link CassandraPojoSink}.
-     *
-     * @param <IN>
-     */
-    public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
-        public CassandraPojoSinkBuilder(
-                DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-            super(input, typeInfo, serializer);
-        }
-
-        @Override
-        protected void sanityCheck() {
-            super.sanityCheck();
-            if (query != null) {
-                throw new IllegalArgumentException(
-                        "Specifying a query is not allowed when using a Pojo-Stream as input.");
-            }
-        }
-
-        @Override
-        public CassandraSink<IN> createSink() throws Exception {
-            final CassandraPojoSink<IN> sink =
-                    new CassandraPojoSink<>(
-                            typeInfo.getTypeClass(),
-                            builder,
-                            mapperOptions,
-                            keyspace,
-                            configBuilder.build(),
-                            failureHandler);
-            return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
-        }
-
-        @Override
-        protected CassandraSink<IN> createWriteAheadSink() throws Exception {
-            throw new IllegalArgumentException(
-                    "Exactly-once guarantees can only be provided for tuple types.");
-        }
-    }
-
-    /**
-     * Builder for a {@link CassandraScalaProductSink}.
-     *
-     * @param <IN>
-     */
-    public static class CassandraScalaProductSinkBuilder<IN extends Product>
-            extends CassandraSinkBuilder<IN> {
-        public CassandraScalaProductSinkBuilder(
-                DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-            super(input, typeInfo, serializer);
-        }
-
-        @Override
-        protected void sanityCheck() {
-            super.sanityCheck();
-            if (query == null || query.length() == 0) {
-                throw new IllegalArgumentException("Query must not be null or empty.");
-            }
-            if (keyspace != null) {
-                throw new IllegalArgumentException(
-                        "Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
-            }
-        }
-
-        @Override
-        public CassandraSink<IN> createSink() throws Exception {
-            final CassandraScalaProductSink<IN> sink =
-                    new CassandraScalaProductSink<>(
-                            query, builder, configBuilder.build(), failureHandler);
-            return new CassandraSink<>(input.addSink(sink).name("Cassandra Sink"));
-        }
-
-        @Override
-        protected CassandraSink<IN> createWriteAheadSink() throws Exception {
-            throw new IllegalArgumentException(
-                    "Exactly-once guarantees can only be provided for flink tuple types.");
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
deleted file mode 100644
index b107f27f9b2..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.SinkUtils;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link
- * CassandraTupleSink}.
- *
- * <p>In case of experiencing the following error: {@code Error while sending value.
- * com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query
- * at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write)},
- *
- * <p>it is recommended to increase the Cassandra write timeout to adapt to your workload in your
- * Cassandra cluster so that such timeout errors do not happen. For that you need to raise
- * write_request_timeout_in_ms conf parameter in your cassandra.yml. Indeed, This exception means
- * that Cassandra coordinator node (internal Cassandra) waited too long for an internal replication
- * (replication to another node and did not ack the write. It is not recommended to lower the
- * replication factor in your Cassandra cluster because it is mandatory that you do not loose data
- * in case of a Cassandra cluster failure. Waiting for a single replica for write acknowledge is the
- * minimum level for this guarantee in Cassandra.}
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN>
-        implements CheckpointedFunction {
-    protected final Logger log = LoggerFactory.getLogger(getClass());
-
-    protected transient Cluster cluster;
-    protected transient Session session;
-
-    private AtomicReference<Throwable> throwable;
-    private FutureCallback<V> callback;
-    private Semaphore semaphore;
-
-    private final ClusterBuilder builder;
-    private final CassandraSinkBaseConfig config;
-
-    private final CassandraFailureHandler failureHandler;
-
-    CassandraSinkBase(
-            ClusterBuilder builder,
-            CassandraSinkBaseConfig config,
-            CassandraFailureHandler failureHandler) {
-        this.builder = builder;
-        this.config = config;
-        this.failureHandler = Preconditions.checkNotNull(failureHandler);
-        ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-    }
-
-    @Override
-    public void open(Configuration configuration) {
-        this.callback =
-                new FutureCallback<V>() {
-                    @Override
-                    public void onSuccess(V ignored) {
-                        semaphore.release();
-                    }
-
-                    @Override
-                    public void onFailure(Throwable t) {
-                        throwable.compareAndSet(null, t);
-                        log.error("Error while sending value.", t);
-                        semaphore.release();
-                    }
-                };
-        this.cluster = builder.getCluster();
-        this.session = createSession();
-
-        throwable = new AtomicReference<>();
-        semaphore = new Semaphore(config.getMaxConcurrentRequests());
-    }
-
-    @Override
-    public void close() throws Exception {
-        try {
-            checkAsyncErrors();
-            flush();
-            checkAsyncErrors();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                log.error("Error while closing session.", e);
-            }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                log.error("Error while closing cluster.", e);
-            }
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {}
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
-        checkAsyncErrors();
-        flush();
-        checkAsyncErrors();
-    }
-
-    @Override
-    public void invoke(IN value) throws Exception {
-        checkAsyncErrors();
-        tryAcquire(1);
-        final ListenableFuture<V> result;
-        try {
-            result = send(value);
-        } catch (Throwable e) {
-            semaphore.release();
-            throw e;
-        }
-        Futures.addCallback(result, callback);
-    }
-
-    protected Session createSession() {
-        return cluster.connect();
-    }
-
-    public abstract ListenableFuture<V> send(IN value);
-
-    private void tryAcquire(int permits) throws InterruptedException, TimeoutException {
-        SinkUtils.tryAcquire(
-                permits,
-                config.getMaxConcurrentRequests(),
-                config.getMaxConcurrentRequestsTimeout(),
-                semaphore);
-    }
-
-    private void checkAsyncErrors() throws Exception {
-        final Throwable currentError = throwable.getAndSet(null);
-        if (currentError != null) {
-            failureHandler.onFailure(currentError);
-        }
-    }
-
-    private void flush() throws InterruptedException, TimeoutException {
-        tryAcquire(config.getMaxConcurrentRequests());
-        semaphore.release(config.getMaxConcurrentRequests());
-    }
-
-    @VisibleForTesting
-    int getAvailablePermits() {
-        return semaphore.availablePermits();
-    }
-
-    @VisibleForTesting
-    int getAcquiredPermits() {
-        return config.getMaxConcurrentRequests() - semaphore.availablePermits();
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
deleted file mode 100644
index b4b7042b8fd..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.time.Duration;
-
-/** Configuration for {@link CassandraSinkBase}. */
-public final class CassandraSinkBaseConfig implements Serializable {
-    // ------------------------ Default Configurations ------------------------
-
-    /** The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. */
-    public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE;
-
-    /**
-     * The default timeout duration when acquiring a permit to execute. By default, {@code
-     * Long.MAX_VALUE}.
-     */
-    public static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT =
-            Duration.ofMillis(Long.MAX_VALUE);
-
-    /** The default option to ignore null fields on insertion. By default, {@code false}. */
-    public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false;
-
-    // ------------------------- Configuration Fields -------------------------
-
-    /** Maximum number of concurrent requests allowed. */
-    private final int maxConcurrentRequests;
-
-    /** Timeout duration when acquiring a permit to execute. */
-    private final Duration maxConcurrentRequestsTimeout;
-
-    /** Whether to ignore null fields on insert. */
-    private final boolean ignoreNullFields;
-
-    private CassandraSinkBaseConfig(
-            int maxConcurrentRequests,
-            Duration maxConcurrentRequestsTimeout,
-            boolean ignoreNullFields) {
-        Preconditions.checkArgument(
-                maxConcurrentRequests > 0, "Max concurrent requests is expected to be positive");
-        Preconditions.checkNotNull(
-                maxConcurrentRequestsTimeout, "Max concurrent requests timeout cannot be null");
-        Preconditions.checkArgument(
-                !maxConcurrentRequestsTimeout.isNegative(),
-                "Max concurrent requests timeout is expected to be positive");
-        this.maxConcurrentRequests = maxConcurrentRequests;
-        this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
-        this.ignoreNullFields = ignoreNullFields;
-    }
-
-    public int getMaxConcurrentRequests() {
-        return maxConcurrentRequests;
-    }
-
-    public Duration getMaxConcurrentRequestsTimeout() {
-        return maxConcurrentRequestsTimeout;
-    }
-
-    public boolean getIgnoreNullFields() {
-        return ignoreNullFields;
-    }
-
-    @Override
-    public String toString() {
-        return "CassandraSinkBaseConfig{"
-                + "maxConcurrentRequests="
-                + maxConcurrentRequests
-                + ", maxConcurrentRequestsTimeout="
-                + maxConcurrentRequestsTimeout
-                + ", ignoreNullFields="
-                + ignoreNullFields
-                + '}';
-    }
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /** Builder for the {@link CassandraSinkBaseConfig}. */
-    public static class Builder {
-        private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
-        private Duration maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
-        private boolean ignoreNullFields = DEFAULT_IGNORE_NULL_FIELDS;
-
-        Builder() {}
-
-        public Builder setMaxConcurrentRequests(int maxConcurrentRequests) {
-            this.maxConcurrentRequests = maxConcurrentRequests;
-            return this;
-        }
-
-        public Builder setMaxConcurrentRequestsTimeout(Duration timeout) {
-            this.maxConcurrentRequestsTimeout = timeout;
-            return this;
-        }
-
-        public Builder setIgnoreNullFields(boolean ignoreNullFields) {
-            this.ignoreNullFields = ignoreNullFields;
-            return this;
-        }
-
-        public CassandraSinkBaseConfig build() {
-            return new CassandraSinkBaseConfig(
-                    maxConcurrentRequests, maxConcurrentRequestsTimeout, ignoreNullFields);
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
deleted file mode 100644
index 523b163b1b8..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Sink to write Flink {@link Tuple}s into a Cassandra cluster. Please read the recommendations in
- * {@linkplain CassandraSinkBase}.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
- */
-public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
-    public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-        this(insertQuery, builder, CassandraSinkBaseConfig.newBuilder().build());
-    }
-
-    CassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraSinkBaseConfig config) {
-        this(insertQuery, builder, config, new NoOpCassandraFailureHandler());
-    }
-
-    CassandraTupleSink(
-            String insertQuery,
-            ClusterBuilder builder,
-            CassandraSinkBaseConfig config,
-            CassandraFailureHandler failureHandler) {
-        super(insertQuery, builder, config, failureHandler);
-    }
-
-    @Override
-    protected Object[] extract(IN record) {
-        Object[] al = new Object[record.getArity()];
-        for (int i = 0; i < record.getArity(); i++) {
-            al[i] = record.getField(i);
-        }
-        return al;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
deleted file mode 100644
index d31565e3d0d..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Sink that emits its input elements into a Cassandra database. This sink stores incoming records
- * within a {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to
- * cassandra if a checkpoint is completed.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
-    private static final long serialVersionUID = 1L;
-
-    protected transient Cluster cluster;
-    protected transient Session session;
-
-    private final String insertQuery;
-    private transient PreparedStatement preparedStatement;
-
-    private ClusterBuilder builder;
-
-    private transient Object[] fields;
-
-    protected CassandraTupleWriteAheadSink(
-            String insertQuery,
-            TypeSerializer<IN> serializer,
-            ClusterBuilder builder,
-            CheckpointCommitter committer)
-            throws Exception {
-        super(committer, serializer, UUID.randomUUID().toString().replace("-", "_"));
-        this.insertQuery = insertQuery;
-        this.builder = builder;
-        ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-    }
-
-    public void open() throws Exception {
-        super.open();
-        if (!getRuntimeContext().isCheckpointingEnabled()) {
-            throw new IllegalStateException(
-                    "The write-ahead log requires checkpointing to be enabled.");
-        }
-        cluster = builder.getCluster();
-        session = cluster.connect();
-        preparedStatement = session.prepare(insertQuery);
-
-        fields = new Object[((TupleSerializer<IN>) serializer).getArity()];
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        try {
-            if (session != null) {
-                session.close();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while closing session.", e);
-        }
-        try {
-            if (cluster != null) {
-                cluster.close();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while closing cluster.", e);
-        }
-    }
-
-    @Override
-    protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp)
-            throws Exception {
-        final AtomicInteger updatesCount = new AtomicInteger(0);
-        final AtomicInteger updatesConfirmed = new AtomicInteger(0);
-
-        final AtomicReference<Throwable> exception = new AtomicReference<>();
-
-        FutureCallback<ResultSet> callback =
-                new FutureCallback<ResultSet>() {
-                    @Override
-                    public void onSuccess(ResultSet resultSet) {
-                        updatesConfirmed.incrementAndGet();
-                        if (updatesCount.get() > 0) { // only set if all updates have been sent
-                            if (updatesCount.get() == updatesConfirmed.get()) {
-                                synchronized (updatesConfirmed) {
-                                    updatesConfirmed.notifyAll();
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        if (exception.compareAndSet(null, throwable)) {
-                            LOG.error("Error while sending value.", throwable);
-                            synchronized (updatesConfirmed) {
-                                updatesConfirmed.notifyAll();
-                            }
-                        }
-                    }
-                };
-
-        // set values for prepared statement
-        int updatesSent = 0;
-        for (IN value : values) {
-            for (int x = 0; x < value.getArity(); x++) {
-                fields[x] = value.getField(x);
-            }
-            // insert values and send to cassandra
-            BoundStatement s = preparedStatement.bind(fields);
-            s.setDefaultTimestamp(timestamp);
-            ResultSetFuture result = session.executeAsync(s);
-            updatesSent++;
-            if (result != null) {
-                // add callback to detect errors
-                Futures.addCallback(result, callback);
-            }
-        }
-        updatesCount.set(updatesSent);
-
-        synchronized (updatesConfirmed) {
-            while (exception.get() == null && updatesSent != updatesConfirmed.get()) {
-                updatesConfirmed.wait();
-            }
-        }
-
-        if (exception.get() != null) {
-            LOG.warn("Sending a value failed.", exception.get());
-            return false;
-        } else {
-            return true;
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
deleted file mode 100644
index fb537bc6eac..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-
-import java.io.Serializable;
-
-/**
- * This class is used to configure a {@link com.datastax.driver.core.Cluster} after deployment. The
- * cluster represents the connection that will be established to Cassandra.
- */
-public abstract class ClusterBuilder implements Serializable {
-
-    public Cluster getCluster() {
-        return buildCluster(Cluster.builder());
-    }
-
-    /**
-     * Configures the connection to Cassandra. The configuration is done by calling methods on the
-     * builder object and finalizing the configuration with build().
-     *
-     * @param builder connection builder
-     * @return configured connection
-     */
-    protected abstract Cluster buildCluster(Cluster.Builder builder);
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java
deleted file mode 100644
index d62fb986c0e..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.Mapper;
-
-import java.io.Serializable;
-
-/**
- * This class is used to configure a {@link com.datastax.driver.mapping.Mapper} after deployment.
- */
-public interface MapperOptions extends Serializable {
-
-    /**
-     * Returns an array of {@link com.datastax.driver.mapping.Mapper.Option} that are used configure
-     * the mapper.
-     *
-     * @return array of options used to configure the mapper.
-     */
-    Mapper.Option[] getMapperOptions();
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/NoOpCassandraFailureHandler.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/NoOpCassandraFailureHandler.java
deleted file mode 100644
index 47399158c72..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/NoOpCassandraFailureHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.IOException;
-
-/** A {@link CassandraFailureHandler} that simply fails the sink on any failures. */
-@Internal
-public class NoOpCassandraFailureHandler implements CassandraFailureHandler {
-
-    private static final long serialVersionUID = 737941343410827885L;
-
-    @Override
-    public void onFailure(Throwable failure) throws IOException {
-        // simply fail the sink
-        throw new IOException("Error while sending value.", failure);
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleMapperOptions.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleMapperOptions.java
deleted file mode 100644
index ce93d37cf6d..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleMapperOptions.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.mapping.Mapper;
-
-import java.util.ArrayList;
-
-/** A simple MapperOptions implementation. */
-public class SimpleMapperOptions implements MapperOptions {
-
-    private static final long serialVersionUID = 1L;
-
-    private final ArrayList<Mapper.Option> options;
-
-    public SimpleMapperOptions() {
-        options = new ArrayList<>();
-    }
-
-    /**
-     * Adds time-to-live option to a mapper operation. This is only valid for save operations.
-     *
-     * <p>Note that this option is only available if using {@link ProtocolVersion#V2} or above.
-     *
-     * @param ttl the TTL (in seconds).
-     */
-    public SimpleMapperOptions ttl(int ttl) {
-        options.add(Mapper.Option.ttl(ttl));
-        return this;
-    }
-
-    /**
-     * Adds a timestamp option to a mapper operation. This is only valid for save and delete
-     * operations.
-     *
-     * <p>Note that this option is only available if using {@link ProtocolVersion#V2} or above.
-     *
-     * @param timestamp the timestamp (in microseconds).
-     */
-    public SimpleMapperOptions timestamp(long timestamp) {
-        options.add(Mapper.Option.timestamp(timestamp));
-        return this;
-    }
-
-    /**
-     * Adds a consistency level value option to a mapper operation. This is valid for save, delete
-     * and get operations.
-     *
-     * <p>Note that the consistency level can also be defined at the mapper level, as a parameter of
-     * the {@link com.datastax.driver.mapping.annotations.Table} annotation (this is redundant for
-     * backward compatibility). This option, whether defined on a specific call or as the default,
-     * will always take precedence over the annotation.
-     *
-     * @param cl the {@link com.datastax.driver.core.ConsistencyLevel} to use for the operation.
-     */
-    public SimpleMapperOptions consistencyLevel(ConsistencyLevel cl) {
-        options.add(Mapper.Option.consistencyLevel(cl));
-        return this;
-    }
-
-    /**
-     * Enables query tracing for a mapper operation. This is valid for save, delete and get
-     * operations.
-     *
-     * @param enabled whether to enable tracing.
-     */
-    public SimpleMapperOptions tracing(boolean enabled) {
-        options.add(Mapper.Option.tracing(enabled));
-        return this;
-    }
-
-    /**
-     * Specifies whether null entity fields should be included in insert queries. This option is
-     * valid only for save operations.
-     *
-     * <p>If this option is not specified, it defaults to {@code true} (null fields are saved).
-     *
-     * @param enabled whether to include null fields in queries.
-     */
-    public SimpleMapperOptions saveNullFields(boolean enabled) {
-        options.add(Mapper.Option.saveNullFields(enabled));
-        return this;
-    }
-
-    /**
-     * Specifies whether an IF NOT EXISTS clause should be included in insert queries. This option
-     * is valid only for save operations.
-     *
-     * <p>If this option is not specified, it defaults to {@code false} (IF NOT EXISTS statements
-     * are not used).
-     *
-     * @param enabled whether to include an IF NOT EXISTS clause in queries.
-     */
-    public SimpleMapperOptions ifNotExists(boolean enabled) {
-        options.add(Mapper.Option.ifNotExists(enabled));
-        return this;
-    }
-
-    @Override
-    public Mapper.Option[] getMapperOptions() {
-        return options.toArray(new Mapper.Option[0]);
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-connector-cassandra/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index c4e1b3ac759..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,11 +0,0 @@
-flink-connector-cassandra
-Copyright 2014-2022 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
-This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) 
-
-- com.datastax.cassandra:cassandra-driver-core:shaded:3.11.2
-- com.datastax.cassandra:cassandra-driver-mapping:3.11.2
-- com.google.guava:guava:19.0
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
deleted file mode 100644
index 25ad1ad6c9b..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.architecture;
-
-import org.apache.flink.architecture.common.ImportOptions;
-
-import com.tngtech.archunit.core.importer.ImportOption;
-import com.tngtech.archunit.junit.AnalyzeClasses;
-import com.tngtech.archunit.junit.ArchTest;
-import com.tngtech.archunit.junit.ArchTests;
-
-/** Architecture tests for test code. */
-@AnalyzeClasses(
-        packages = {
-            "org.apache.flink.batch.connectors.cassandra",
-            "org.apache.flink.streaming.connectors.cassandra"
-        },
-        importOptions = {
-            ImportOption.OnlyIncludeTests.class,
-            ImportOptions.ExcludeScalaImportOption.class,
-            ImportOptions.ExcludeShadedImportOption.class
-        })
-public class TestCodeArchitectureTest {
-
-    @ArchTest
-    public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
deleted file mode 100644
index 2e1711330ea..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra.example;
-
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-
-import java.util.ArrayList;
-
-/**
- * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API.
- *
- * <p>The example assumes that a table exists in a local cassandra database, according to the
- * following queries: CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class':
- * 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE IF NOT EXISTS test.batches (number
- * int, strings text, PRIMARY KEY(number, strings));
- */
-public class BatchExample {
-    private static final String INSERT_QUERY =
-            "INSERT INTO test.batches (number, strings) VALUES (?,?);";
-    private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;";
-
-    /*
-     *	table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));"
-     */
-    public static void main(String[] args) throws Exception {
-
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-
-        ArrayList<Tuple2<Integer, String>> collection = new ArrayList<>(20);
-        for (int i = 0; i < 20; i++) {
-            collection.add(new Tuple2<>(i, "string " + i));
-        }
-
-        DataSet<Tuple2<Integer, String>> dataSet = env.fromCollection(collection);
-
-        dataSet.output(
-                new CassandraTupleOutputFormat<Tuple2<Integer, String>>(
-                        INSERT_QUERY,
-                        new ClusterBuilder() {
-                            @Override
-                            protected Cluster buildCluster(Builder builder) {
-                                return builder.addContactPoints("127.0.0.1").build();
-                            }
-                        }));
-
-        env.execute("Write");
-
-        DataSet<Tuple2<Integer, String>> inputDS =
-                env.createInput(
-                        new CassandraInputFormat<Tuple2<Integer, String>>(
-                                SELECT_QUERY,
-                                new ClusterBuilder() {
-                                    @Override
-                                    protected Cluster buildCluster(Builder builder) {
-                                        return builder.addContactPoints("127.0.0.1").build();
-                                    }
-                                }),
-                        TupleTypeInfo.of(new TypeHint<Tuple2<Integer, String>>() {}));
-
-        inputDS.print();
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
deleted file mode 100644
index 764c0017e86..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra.example;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.mapping.Mapper;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/**
- * This is an example showing the to use the {@link CassandraPojoInputFormat}/{@link
- * CassandraPojoOutputFormat} in the Batch API.
- *
- * <p>The example assumes that a table exists in a local cassandra database, according to the
- * following queries: CREATE KEYSPACE IF NOT EXISTS flink WITH replication = {'class':
- * 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE IF NOT EXISTS flink.batches (id text,
- * counter int, batch_id int, PRIMARY KEY(id, counter, batchId));
- */
-public class BatchPojoExample {
-    private static final String SELECT_QUERY = "SELECT id, counter, batch_id FROM flink.batches;";
-
-    public static void main(String[] args) throws Exception {
-
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-
-        List<Pojo> customCassandraAnnotatedPojos =
-                IntStream.range(0, 20)
-                        .mapToObj(x -> new Pojo(UUID.randomUUID().toString(), x, 0))
-                        .collect(Collectors.toList());
-
-        DataSet<Pojo> dataSet = env.fromCollection(customCassandraAnnotatedPojos);
-
-        ClusterBuilder clusterBuilder =
-                new ClusterBuilder() {
-                    private static final long serialVersionUID = -1754532803757154795L;
-
-                    @Override
-                    protected Cluster buildCluster(Cluster.Builder builder) {
-                        return builder.addContactPoints("127.0.0.1").build();
-                    }
-                };
-
-        dataSet.output(
-                new CassandraPojoOutputFormat<>(
-                        clusterBuilder,
-                        Pojo.class,
-                        () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}));
-
-        env.execute("Write");
-
-        /*
-         *	This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat.
-         */
-        DataSet<Pojo> inputDS =
-                env.createInput(
-                        new CassandraPojoInputFormat<>(
-                                SELECT_QUERY,
-                                clusterBuilder,
-                                Pojo.class,
-                                () ->
-                                        new Mapper.Option[] {
-                                            Mapper.Option.consistencyLevel(ConsistencyLevel.ANY)
-                                        }));
-
-        inputDS.print();
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
deleted file mode 100644
index 559f107984a..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra.example;
-
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
-
-import java.io.Serializable;
-
-/** Test Pojo with DataStax annotations used. */
-@Table(keyspace = "flink", name = "batches")
-public class Pojo implements Serializable {
-
-    private static final long serialVersionUID = 1038054554690916991L;
-
-    @Column(name = "id")
-    private String id;
-
-    @Column(name = "counter")
-    private int counter;
-
-    @Column(name = "batch_id")
-    private int batchID;
-
-    // required for deserialization
-    public Pojo() {}
-
-    public Pojo(String id, int counter, int batchID) {
-        this.id = id;
-        this.counter = counter;
-        this.batchID = batchID;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public int getCounter() {
-        return counter;
-    }
-
-    public void setCounter(int counter) {
-        this.counter = counter;
-    }
-
-    public int getBatchID() {
-        return batchID;
-    }
-
-    public void setBatchID(int batchId) {
-        this.batchID = batchId;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/ResultSetFutures.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/ResultSetFutures.java
deleted file mode 100644
index 1a7cd810afb..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/ResultSetFutures.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.cassandra.utils;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** Utility class to create {@link com.datastax.driver.core.ResultSetFuture}s. */
-public class ResultSetFutures {
-
-    private ResultSetFutures() {}
-
-    public static ResultSetFuture fromCompletableFuture(CompletableFuture<ResultSet> future) {
-        checkNotNull(future);
-        return new CompletableResultSetFuture(future);
-    }
-
-    private static class CompletableResultSetFuture implements ResultSetFuture {
-
-        private final CompletableFuture<ResultSet> completableFuture;
-
-        CompletableResultSetFuture(CompletableFuture<ResultSet> future) {
-            this.completableFuture = future;
-        }
-
-        @Override
-        public ResultSet getUninterruptibly() {
-            try {
-                return completableFuture.get();
-            } catch (InterruptedException e) {
-                return getUninterruptibly();
-            } catch (ExecutionException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public ResultSet getUninterruptibly(long l, TimeUnit timeUnit) throws TimeoutException {
-            try {
-                return completableFuture.get(l, timeUnit);
-            } catch (InterruptedException e) {
-                return getUninterruptibly();
-            } catch (ExecutionException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public boolean cancel(boolean b) {
-            return completableFuture.cancel(b);
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return completableFuture.isCancelled();
-        }
-
-        @Override
-        public boolean isDone() {
-            return completableFuture.isDone();
-        }
-
-        @Override
-        public ResultSet get() throws InterruptedException, ExecutionException {
-            return completableFuture.get();
-        }
-
-        @Override
-        public ResultSet get(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, TimeoutException {
-            return completableFuture.get(timeout, unit);
-        }
-
-        @Override
-        public void addListener(Runnable listener, Executor executor) {
-            completableFuture.whenComplete((result, error) -> listener.run());
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
deleted file mode 100644
index 91967cfd660..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ /dev/null
@@ -1,912 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
-import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
-import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
-import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.testutils.junit.RetryOnException;
-import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.DockerImageVersions;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.annotations.Table;
-import net.bytebuddy.ByteBuddy;
-import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.CassandraContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.images.builder.Transferable;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** IT cases for all cassandra sinks. */
-@SuppressWarnings("serial")
-// NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
-@RetryOnException(times = 3, exception = NoHostAvailableException.class)
-@Testcontainers
-@ExtendWith(RetryExtension.class)
-class CassandraConnectorITCase
-        extends WriteAheadSinkTestBase<
-                Tuple3<String, Integer, Integer>,
-                CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
-
-    private static final int MAX_CONNECTION_RETRY = 3;
-    private static final long CONNECTION_RETRY_DELAY = 500L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
-    private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
-
-    @TempDir static Path tmpDir;
-
-    private static final int READ_TIMEOUT_MILLIS = 36000;
-
-    @Container static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
-
-    private static final int PORT = 9042;
-
-    private static Cluster cluster;
-    private static Session session;
-
-    private final ClusterBuilder builderForReading =
-            createBuilderWithConsistencyLevel(ConsistencyLevel.ONE);
-    // Lower consistency level ANY is only available for writing.
-    private final ClusterBuilder builderForWriting =
-            createBuilderWithConsistencyLevel(ConsistencyLevel.ANY);
-
-    private ClusterBuilder createBuilderWithConsistencyLevel(ConsistencyLevel consistencyLevel) {
-        return new ClusterBuilder() {
-            @Override
-            protected Cluster buildCluster(Cluster.Builder builder) {
-                return builder.addContactPointsWithPorts(
-                                new InetSocketAddress(
-                                        CASSANDRA_CONTAINER.getHost(),
-                                        CASSANDRA_CONTAINER.getMappedPort(PORT)))
-                        .withQueryOptions(
-                                new QueryOptions()
-                                        .setConsistencyLevel(consistencyLevel)
-                                        .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
-                        .withSocketOptions(
-                                new SocketOptions()
-                                        // default timeout x 3
-                                        .setConnectTimeoutMillis(15000)
-                                        // default timeout x3 and higher than
-                                        // request_timeout_in_ms at the cluster level
-                                        .setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
-                        .withoutJMXReporting()
-                        .withoutMetrics()
-                        .build();
-            }
-        };
-    }
-
-    private static final String TABLE_NAME_PREFIX = "flink_";
-    private static final String TABLE_NAME_VARIABLE = "$TABLE";
-    private static final String KEYSPACE = "flink";
-    private static final String TUPLE_ID_FIELD = "id";
-    private static final String TUPLE_COUNTER_FIELD = "counter";
-    private static final String TUPLE_BATCHID_FIELD = "batch_id";
-    private static final String CREATE_KEYSPACE_QUERY =
-            "CREATE KEYSPACE "
-                    + KEYSPACE
-                    + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-    private static final String CREATE_TABLE_QUERY =
-            "CREATE TABLE "
-                    + KEYSPACE
-                    + "."
-                    + TABLE_NAME_VARIABLE
-                    + " ("
-                    + TUPLE_ID_FIELD
-                    + " text PRIMARY KEY, "
-                    + TUPLE_COUNTER_FIELD
-                    + " int, "
-                    + TUPLE_BATCHID_FIELD
-                    + " int);";
-    private static final String INSERT_DATA_QUERY =
-            "INSERT INTO "
-                    + KEYSPACE
-                    + "."
-                    + TABLE_NAME_VARIABLE
-                    + " ("
-                    + TUPLE_ID_FIELD
-                    + ", "
-                    + TUPLE_COUNTER_FIELD
-                    + ", "
-                    + TUPLE_BATCHID_FIELD
-                    + ") VALUES (?, ?, ?)";
-    private static final String SELECT_DATA_QUERY =
-            "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_VARIABLE + ';';
-
-    private static final Random random = new Random();
-    private int tableID;
-
-    private static final ArrayList<Tuple3<String, Integer, Integer>> collection =
-            new ArrayList<>(20);
-    private static final ArrayList<Row> rowCollection = new ArrayList<>(20);
-
-    private static final TypeInformation[] FIELD_TYPES = {
-        BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
-    };
-
-    static {
-        for (int i = 0; i < 20; i++) {
-            collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
-            rowCollection.add(Row.of(UUID.randomUUID().toString(), i, 0));
-        }
-    }
-
-    private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, String tableName) {
-        return new ByteBuddy()
-                .redefine(Pojo.class)
-                .name("org.apache.flink.streaming.connectors.cassandra.Pojo" + tableName)
-                .annotateType(createTableAnnotation(keyspace, tableName))
-                .make()
-                .load(Pojo.class.getClassLoader())
-                .getLoaded();
-    }
-
-    @NotNull
-    private static Table createTableAnnotation(String keyspace, String tableName) {
-        return new Table() {
-
-            @Override
-            public String keyspace() {
-                return keyspace;
-            }
-
-            @Override
-            public String name() {
-                return tableName;
-            }
-
-            @Override
-            public boolean caseSensitiveKeyspace() {
-                return false;
-            }
-
-            @Override
-            public boolean caseSensitiveTable() {
-                return false;
-            }
-
-            @Override
-            public String writeConsistency() {
-                return "";
-            }
-
-            @Override
-            public String readConsistency() {
-                return "";
-            }
-
-            @Override
-            public Class<? extends Annotation> annotationType() {
-                return Table.class;
-            }
-        };
-    }
-
-    // ------------------------------------------------------------------------
-    //  Utility methods
-    // ------------------------------------------------------------------------
-
-    public static CassandraContainer createCassandraContainer() {
-        CassandraContainer cassandra = new CassandraContainer(DockerImageVersions.CASSANDRA_4_0);
-        cassandra.withJmxReporting(false);
-        cassandra.withLogConsumer(LOG_CONSUMER);
-        return cassandra;
-    }
-
-    private static void raiseCassandraRequestsTimeouts() {
-        try {
-            final Path configurationPath = Files.createTempFile(tmpDir.toAbsolutePath(), "", "");
-            CASSANDRA_CONTAINER.copyFileFromContainer(
-                    "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
-            String configuration =
-                    new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
-            String patchedConfiguration =
-                    configuration
-                            .replaceAll(
-                                    "request_timeout_in_ms: [0-9]+",
-                                    "request_timeout_in_ms: 30000") // x3 default timeout
-                            .replaceAll(
-                                    "read_request_timeout_in_ms: [0-9]+",
-                                    "read_request_timeout_in_ms: 15000") // x3 default timeout
-                            .replaceAll(
-                                    "write_request_timeout_in_ms: [0-9]+",
-                                    "write_request_timeout_in_ms: 6000"); // x3 default timeout
-            CASSANDRA_CONTAINER.copyFileToContainer(
-                    Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
-                    "/etc/cassandra/cassandra.yaml");
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to open Cassandra configuration file ", e);
-        }
-    }
-
-    private <T> List<T> readPojosWithInputFormat(Class<T> annotatedPojoClass) {
-        final CassandraPojoInputFormat<T> source =
-                new CassandraPojoInputFormat<>(
-                        injectTableName(SELECT_DATA_QUERY), builderForReading, annotatedPojoClass);
-        List<T> result = new ArrayList<>();
-
-        try {
-            source.configure(new Configuration());
-            source.open(null);
-            while (!source.reachedEnd()) {
-                T temp = source.nextRecord(null);
-                result.add(temp);
-            }
-        } finally {
-            source.close();
-        }
-        return result;
-    }
-
-    private <T> List<T> writePojosWithOutputFormat(Class<T> annotatedPojoClass) throws Exception {
-        final CassandraPojoOutputFormat<T> sink =
-                new CassandraPojoOutputFormat<>(
-                        builderForWriting,
-                        annotatedPojoClass,
-                        () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)});
-
-        final Constructor<T> pojoConstructor = getPojoConstructor(annotatedPojoClass);
-        List<T> pojos = new ArrayList<>();
-        for (int i = 0; i < 20; i++) {
-            pojos.add(pojoConstructor.newInstance(UUID.randomUUID().toString(), i, 0));
-        }
-        try {
-            sink.configure(new Configuration());
-            sink.open(0, 1);
-            for (T pojo : pojos) {
-                sink.writeRecord(pojo);
-            }
-        } finally {
-            sink.close();
-        }
-        return pojos;
-    }
-
-    private <T> Constructor<T> getPojoConstructor(Class<T> annotatedPojoClass)
-            throws NoSuchMethodException {
-        return annotatedPojoClass.getConstructor(String.class, Integer.TYPE, Integer.TYPE);
-    }
-
-    private String injectTableName(String target) {
-        return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
-    }
-
-    // ------------------------------------------------------------------------
-    //  Tests initialization
-    // ------------------------------------------------------------------------
-
-    @BeforeAll
-    static void startAndInitializeCassandra() {
-        raiseCassandraRequestsTimeouts();
-        // CASSANDRA_CONTAINER#start() already contains retrials
-        CASSANDRA_CONTAINER.start();
-        cluster = CASSANDRA_CONTAINER.getCluster();
-        int retried = 0;
-        while (retried < MAX_CONNECTION_RETRY) {
-            try {
-                session = cluster.connect();
-                break;
-            } catch (NoHostAvailableException e) {
-                retried++;
-                LOG.debug(
-                        "Connection failed with NoHostAvailableException : retry number {}, will retry to connect within {} ms",
-                        retried,
-                        CONNECTION_RETRY_DELAY);
-                if (retried == MAX_CONNECTION_RETRY) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "Failed to connect to Cassandra cluster after %d retries every %d ms",
-                                    retried, CONNECTION_RETRY_DELAY),
-                            e);
-                }
-                try {
-                    Thread.sleep(CONNECTION_RETRY_DELAY);
-                } catch (InterruptedException ignored) {
-                }
-            }
-        }
-        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
-    }
-
-    @BeforeEach
-    void createTable() {
-        tableID = random.nextInt(Integer.MAX_VALUE);
-        session.execute(requestWithTimeout(injectTableName(CREATE_TABLE_QUERY)));
-    }
-
-    @AfterAll
-    static void closeCassandra() {
-        if (session != null) {
-            session.close();
-        }
-        if (cluster != null) {
-            cluster.close();
-        }
-        CASSANDRA_CONTAINER.stop();
-    }
-
-    // ------------------------------------------------------------------------
-    //  Technical Tests
-    // ------------------------------------------------------------------------
-
-    @TestTemplate
-    void testAnnotatePojoWithTable() {
-        final String tableName = TABLE_NAME_PREFIX + tableID;
-
-        final Class<? extends Pojo> annotatedPojoClass = annotatePojoWithTable(KEYSPACE, tableName);
-        final Table pojoTableAnnotation = annotatedPojoClass.getAnnotation(Table.class);
-        assertThat(pojoTableAnnotation.name()).contains(tableName);
-    }
-
-    @TestTemplate
-    void testRaiseCassandraRequestsTimeouts() throws IOException {
-        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
-        // do not change the container conf twice, just assert that it was indeed changed in the
-        // container
-        final Path configurationPath = Files.createTempFile(tmpDir.toAbsolutePath(), "", "");
-        CASSANDRA_CONTAINER.copyFileFromContainer(
-                "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
-        final String configuration =
-                new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
-        assertThat(configuration)
-                .contains("request_timeout_in_ms: 30000")
-                .contains("read_request_timeout_in_ms: 15000")
-                .contains("write_request_timeout_in_ms: 6000");
-    }
-
-    // ------------------------------------------------------------------------
-    //  Exactly-once Tests
-    // ------------------------------------------------------------------------
-
-    @Override
-    protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink()
-            throws Exception {
-        return new CassandraTupleWriteAheadSink<>(
-                injectTableName(INSERT_DATA_QUERY),
-                TypeExtractor.getForObject(new Tuple3<>("", 0, 0))
-                        .createSerializer(new ExecutionConfig()),
-                builderForReading,
-                new CassandraCommitter(builderForReading));
-    }
-
-    @Override
-    protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
-        return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
-    }
-
-    @Override
-    protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
-        return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
-    }
-
-    @Override
-    protected void verifyResultsIdealCircumstances(
-            CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
-        ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        ArrayList<Integer> list = new ArrayList<>();
-        for (int x = 1; x <= 60; x++) {
-            list.add(x);
-        }
-
-        for (com.datastax.driver.core.Row s : result) {
-            list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
-        }
-        assertThat(list)
-                .as("The following ID's were not found in the ResultSet: " + list)
-                .isEmpty();
-    }
-
-    @Override
-    protected void verifyResultsDataPersistenceUponMissedNotify(
-            CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
-        ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        ArrayList<Integer> list = new ArrayList<>();
-        for (int x = 1; x <= 60; x++) {
-            list.add(x);
-        }
-
-        for (com.datastax.driver.core.Row s : result) {
-            list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
-        }
-        assertThat(list)
-                .as("The following ID's were not found in the ResultSet: " + list)
-                .isEmpty();
-    }
-
-    @Override
-    protected void verifyResultsDataDiscardingUponRestore(
-            CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-
-        ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        ArrayList<Integer> list = new ArrayList<>();
-        for (int x = 1; x <= 20; x++) {
-            list.add(x);
-        }
-        for (int x = 41; x <= 60; x++) {
-            list.add(x);
-        }
-
-        for (com.datastax.driver.core.Row s : result) {
-            list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
-        }
-        assertThat(list)
-                .as("The following ID's were not found in the ResultSet: " + list)
-                .isEmpty();
-    }
-
-    @Override
-    protected void verifyResultsWhenReScaling(
-            CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink,
-            int startElementCounter,
-            int endElementCounter) {
-
-        // IMPORTANT NOTE:
-        //
-        // for cassandra we always have to start from 1 because
-        // all operators will share the same final db
-
-        ArrayList<Integer> expected = new ArrayList<>();
-        for (int i = 1; i <= endElementCounter; i++) {
-            expected.add(i);
-        }
-
-        ArrayList<Integer> actual = new ArrayList<>();
-        ResultSet result = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-
-        for (com.datastax.driver.core.Row s : result) {
-            actual.add(s.getInt(TUPLE_COUNTER_FIELD));
-        }
-
-        Collections.sort(actual);
-        assertThat(actual.toArray()).isEqualTo(expected.toArray());
-    }
-
-    @TestTemplate
-    void testCassandraCommitter() throws Exception {
-        String jobID = new JobID().toString();
-        CassandraCommitter cc1 = new CassandraCommitter(builderForReading, "flink_auxiliary_cc");
-        cc1.setJobId(jobID);
-        cc1.setOperatorId("operator");
-
-        CassandraCommitter cc2 = new CassandraCommitter(builderForReading, "flink_auxiliary_cc");
-        cc2.setJobId(jobID);
-        cc2.setOperatorId("operator");
-
-        CassandraCommitter cc3 = new CassandraCommitter(builderForReading, "flink_auxiliary_cc");
-        cc3.setJobId(jobID);
-        cc3.setOperatorId("operator1");
-
-        cc1.createResource();
-
-        cc1.open();
-        cc2.open();
-        cc3.open();
-
-        assertThat(cc1.isCheckpointCommitted(0, 1)).isFalse();
-        assertThat(cc2.isCheckpointCommitted(1, 1)).isFalse();
-        assertThat(cc3.isCheckpointCommitted(0, 1)).isFalse();
-
-        cc1.commitCheckpoint(0, 1);
-        assertThat(cc1.isCheckpointCommitted(0, 1)).isTrue();
-        // verify that other sub-tasks aren't affected
-        assertThat(cc2.isCheckpointCommitted(1, 1)).isFalse();
-        // verify that other tasks aren't affected
-        assertThat(cc3.isCheckpointCommitted(0, 1)).isFalse();
-
-        assertThat(cc1.isCheckpointCommitted(0, 2)).isFalse();
-
-        cc1.close();
-        cc2.close();
-        cc3.close();
-
-        cc1 = new CassandraCommitter(builderForReading, "flink_auxiliary_cc");
-        cc1.setJobId(jobID);
-        cc1.setOperatorId("operator");
-
-        cc1.open();
-
-        // verify that checkpoint data is not destroyed within open/close and not reliant on
-        // internally cached data
-        assertThat(cc1.isCheckpointCommitted(0, 1)).isTrue();
-        assertThat(cc1.isCheckpointCommitted(0, 2)).isFalse();
-
-        cc1.close();
-    }
-
-    // ------------------------------------------------------------------------
-    //  At-least-once Tests
-    // ------------------------------------------------------------------------
-
-    @TestTemplate
-    void testCassandraTupleAtLeastOnceSink() throws Exception {
-        CassandraTupleSink<Tuple3<String, Integer, Integer>> sink =
-                new CassandraTupleSink<>(injectTableName(INSERT_DATA_QUERY), builderForWriting);
-        try {
-            sink.open(new Configuration());
-            for (Tuple3<String, Integer, Integer> value : collection) {
-                sink.send(value);
-            }
-        } finally {
-            sink.close();
-        }
-
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        assertThat(rs.all()).hasSize(20);
-    }
-
-    @TestTemplate
-    void testCassandraRowAtLeastOnceSink() throws Exception {
-        CassandraRowSink sink =
-                new CassandraRowSink(
-                        FIELD_TYPES.length, injectTableName(INSERT_DATA_QUERY), builderForWriting);
-        try {
-            sink.open(new Configuration());
-            for (Row value : rowCollection) {
-                sink.send(value);
-            }
-        } finally {
-            sink.close();
-        }
-
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        assertThat(rs.all()).hasSize(20);
-    }
-
-    @TestTemplate
-    void testCassandraPojoAtLeastOnceSink() throws Exception {
-        final Class<? extends Pojo> annotatedPojoClass =
-                annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
-        writePojos(annotatedPojoClass, null);
-
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        assertThat(rs.all()).hasSize(20);
-    }
-
-    @TestTemplate
-    void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception {
-        final Class<? extends Pojo> annotatedPojoClass =
-                annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
-        writePojos(annotatedPojoClass, KEYSPACE);
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        assertThat(rs.all()).hasSize(20);
-    }
-
-    private <T> void writePojos(Class<T> annotatedPojoClass, @Nullable String keyspace)
-            throws Exception {
-        final Constructor<T> pojoConstructor = getPojoConstructor(annotatedPojoClass);
-        CassandraPojoSink<T> sink =
-                new CassandraPojoSink<>(annotatedPojoClass, builderForWriting, null, keyspace);
-        try {
-            sink.open(new Configuration());
-            for (int x = 0; x < 20; x++) {
-                sink.send(pojoConstructor.newInstance(UUID.randomUUID().toString(), x, 0));
-            }
-        } finally {
-            sink.close();
-        }
-    }
-
-    @TestTemplate
-    void testCassandraTableSink() throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(4);
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-        DataStreamSource<Row> source = env.fromCollection(rowCollection);
-
-        tEnv.createTemporaryView("testFlinkTable", source);
-        ((TableEnvironmentInternal) tEnv)
-                .registerTableSinkInternal(
-                        "cassandraTable",
-                        new CassandraAppendTableSink(
-                                        builderForWriting, injectTableName(INSERT_DATA_QUERY))
-                                .configure(
-                                        new String[] {"f0", "f1", "f2"},
-                                        new TypeInformation[] {
-                                            Types.STRING, Types.INT, Types.INT
-                                        }));
-
-        tEnv.sqlQuery("select * from testFlinkTable").executeInsert("cassandraTable").await();
-
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-
-        // validate that all input was correctly written to Cassandra
-        List<Row> input = new ArrayList<>(rowCollection);
-        List<com.datastax.driver.core.Row> output = rs.all();
-        for (com.datastax.driver.core.Row o : output) {
-            Row cmp = new Row(3);
-            cmp.setField(0, o.getString(0));
-            cmp.setField(1, o.getInt(2));
-            cmp.setField(2, o.getInt(1));
-            assertThat(input.remove(cmp))
-                    .as("Row " + cmp + " was written to Cassandra but not in input.")
-                    .isTrue();
-        }
-        assertThat(input).as("The input data was not completely written to Cassandra").isEmpty();
-    }
-
-    private static int retrialsCount = 0;
-
-    @TestTemplate
-    void testRetrial() {
-        annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
-        if (retrialsCount < 2) {
-            retrialsCount++;
-            throw new NoHostAvailableException(new HashMap<>());
-        }
-    }
-
-    @TestTemplate
-    void testCassandraBatchPojoFormat() throws Exception {
-
-        final Class<? extends Pojo> annotatedPojoClass =
-                annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
-
-        final List<? extends Pojo> pojos = writePojosWithOutputFormat(annotatedPojoClass);
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        assertThat(rs.all()).hasSize(20);
-
-        final List<? extends Pojo> result = readPojosWithInputFormat(annotatedPojoClass);
-        assertThat(result)
-                .hasSize(20)
-                .usingRecursiveComparison(
-                        RecursiveComparisonConfiguration.builder()
-                                .withIgnoreCollectionOrder(true)
-                                .build())
-                .isEqualTo(pojos);
-    }
-
-    @TestTemplate
-    void testCassandraBatchTupleFormat() throws Exception {
-        OutputFormat<Tuple3<String, Integer, Integer>> sink =
-                new CassandraTupleOutputFormat<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting);
-        try {
-            sink.configure(new Configuration());
-            sink.open(0, 1);
-            for (Tuple3<String, Integer, Integer> value : collection) {
-                sink.writeRecord(value);
-            }
-        } finally {
-            sink.close();
-        }
-
-        sink =
-                new CassandraTupleOutputFormat<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting);
-        try {
-            sink.configure(new Configuration());
-            sink.open(0, 1);
-            for (Tuple3<String, Integer, Integer> value : collection) {
-                sink.writeRecord(value);
-            }
-        } finally {
-            sink.close();
-        }
-
-        InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source =
-                new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), builderForReading);
-        List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
-        try {
-            source.configure(new Configuration());
-            source.open(null);
-            while (!source.reachedEnd()) {
-                result.add(source.nextRecord(new Tuple3<String, Integer, Integer>()));
-            }
-        } finally {
-            source.close();
-        }
-
-        assertThat(result).hasSize(20);
-    }
-
-    @TestTemplate
-    void testCassandraBatchRowFormat() throws Exception {
-        OutputFormat<Row> sink =
-                new CassandraRowOutputFormat(injectTableName(INSERT_DATA_QUERY), builderForWriting);
-        try {
-            sink.configure(new Configuration());
-            sink.open(0, 1);
-            for (Row value : rowCollection) {
-                sink.writeRecord(value);
-            }
-        } finally {
-
-            sink.close();
-        }
-
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        List<com.datastax.driver.core.Row> rows = rs.all();
-        assertThat(rows).hasSameSizeAs(rowCollection);
-    }
-
-    @TestTemplate
-    void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception {
-        Class<scala.Tuple1<String>> c =
-                (Class<scala.Tuple1<String>>) new scala.Tuple1<>("hello").getClass();
-        Seq<TypeInformation<?>> typeInfos =
-                JavaConverters.asScalaBufferConverter(
-                                Collections.<TypeInformation<?>>singletonList(
-                                        BasicTypeInfo.STRING_TYPE_INFO))
-                        .asScala();
-        Seq<String> fieldNames =
-                JavaConverters.asScalaBufferConverter(Collections.singletonList("_1")).asScala();
-
-        CaseClassTypeInfo<scala.Tuple1<String>> typeInfo =
-                new CaseClassTypeInfo<scala.Tuple1<String>>(c, null, typeInfos, fieldNames) {
-                    @Override
-                    public TypeSerializer<scala.Tuple1<String>> createSerializer(
-                            ExecutionConfig config) {
-                        return null;
-                    }
-                };
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStream<scala.Tuple1<String>> input =
-                env.fromElements(new scala.Tuple1<>("hello")).returns(typeInfo);
-
-        CassandraSink.CassandraSinkBuilder<scala.Tuple1<String>> sinkBuilder =
-                CassandraSink.addSink(input);
-        assertThat(sinkBuilder).isInstanceOf(CassandraSink.CassandraScalaProductSinkBuilder.class);
-    }
-
-    @TestTemplate
-    void testCassandraScalaTupleAtLeastSink() throws Exception {
-        CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink =
-                new CassandraScalaProductSink<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting);
-
-        List<scala.Tuple3<String, Integer, Integer>> scalaTupleCollection = new ArrayList<>(20);
-        for (int i = 0; i < 20; i++) {
-            scalaTupleCollection.add(new scala.Tuple3<>(UUID.randomUUID().toString(), i, 0));
-        }
-        try {
-            sink.open(new Configuration());
-            for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) {
-                sink.invoke(value, SinkContextUtil.forTimestamp(0));
-            }
-        } finally {
-            sink.close();
-        }
-
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        List<com.datastax.driver.core.Row> rows = rs.all();
-        assertThat(rows).hasSameSizeAs(scalaTupleCollection);
-
-        for (com.datastax.driver.core.Row row : rows) {
-            scalaTupleCollection.remove(
-                    new scala.Tuple3<>(
-                            row.getString(TUPLE_ID_FIELD),
-                            row.getInt(TUPLE_COUNTER_FIELD),
-                            row.getInt(TUPLE_BATCHID_FIELD)));
-        }
-        assertThat(scalaTupleCollection).isEmpty();
-    }
-
-    @TestTemplate
-    void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
-        CassandraSinkBaseConfig config =
-                CassandraSinkBaseConfig.newBuilder().setIgnoreNullFields(true).build();
-        CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink =
-                new CassandraScalaProductSink<>(
-                        injectTableName(INSERT_DATA_QUERY), builderForWriting, config);
-
-        String id = UUID.randomUUID().toString();
-        Integer counter = 1;
-        Integer batchId = 0;
-
-        // Send partial records across multiple request
-        scala.Tuple3<String, Integer, Integer> scalaTupleRecordFirst =
-                new scala.Tuple3<>(id, counter, null);
-        scala.Tuple3<String, Integer, Integer> scalaTupleRecordSecond =
-                new scala.Tuple3<>(id, null, batchId);
-
-        try {
-            sink.open(new Configuration());
-            sink.invoke(scalaTupleRecordFirst, SinkContextUtil.forTimestamp(0));
-            sink.invoke(scalaTupleRecordSecond, SinkContextUtil.forTimestamp(0));
-        } finally {
-            sink.close();
-        }
-
-        ResultSet rs = session.execute(requestWithTimeout(injectTableName(SELECT_DATA_QUERY)));
-        List<com.datastax.driver.core.Row> rows = rs.all();
-        assertThat(rows).hasSize(1);
-        // Since nulls are ignored, we should be reading one complete record
-        for (com.datastax.driver.core.Row row : rows) {
-            assertThat(
-                            new scala.Tuple3<>(
-                                    row.getString(TUPLE_ID_FIELD),
-                                    row.getInt(TUPLE_COUNTER_FIELD),
-                                    row.getInt(TUPLE_BATCHID_FIELD)))
-                    .isEqualTo(new scala.Tuple3<>(id, counter, batchId));
-        }
-    }
-
-    private static Statement requestWithTimeout(String query) {
-        return new SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
deleted file mode 100644
index 560ea2a013b..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connectors.cassandra.utils.ResultSetFutures;
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.FutureUtils;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/** Tests for the {@link CassandraSinkBase}. */
-class CassandraSinkBaseTest {
-
-    private static final long DEFAULT_TEST_TIMEOUT_IN_MILLIS = 5000;
-
-    @Test
-    void testHostNotFoundErrorHandling() throws Exception {
-        CassandraSinkBase base =
-                new CassandraSinkBase(
-                        new ClusterBuilder() {
-                            @Override
-                            protected Cluster buildCluster(Cluster.Builder builder) {
-                                return builder.addContactPoint("127.0.0.1")
-                                        .withoutJMXReporting()
-                                        .withoutMetrics()
-                                        .build();
-                            }
-                        },
-                        CassandraSinkBaseConfig.newBuilder().build(),
-                        new NoOpCassandraFailureHandler()) {
-                    @Override
-                    public ListenableFuture send(Object value) {
-                        return null;
-                    }
-                };
-
-        assertThatThrownBy(() -> base.open(new Configuration()))
-                .isInstanceOf(NoHostAvailableException.class);
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testSuccessfulPath() throws Exception {
-        try (TestCassandraSink casSinkFunc = createOpenedTestCassandraSink()) {
-            casSinkFunc.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
-
-            final int originalPermits = casSinkFunc.getAvailablePermits();
-            assertThat(originalPermits).isGreaterThan(0);
-            assertThat(casSinkFunc.getAcquiredPermits()).isZero();
-
-            casSinkFunc.invoke("hello");
-
-            assertThat(casSinkFunc.getAvailablePermits()).isEqualTo(originalPermits);
-            assertThat(casSinkFunc.getAcquiredPermits()).isZero();
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testThrowErrorOnClose() throws Exception {
-        TestCassandraSink casSinkFunc = new TestCassandraSink();
-
-        casSinkFunc.open(new Configuration());
-
-        Exception cause = new RuntimeException();
-        casSinkFunc.enqueueCompletableFuture(FutureUtils.completedExceptionally(cause));
-        casSinkFunc.invoke("hello");
-        try {
-            casSinkFunc.close();
-
-            fail("Close should have thrown an exception.");
-        } catch (IOException e) {
-            ExceptionUtils.findThrowable(e, candidate -> candidate == cause).orElseThrow(() -> e);
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testThrowErrorOnInvoke() throws Exception {
-        try (TestCassandraSink casSinkFunc = createOpenedTestCassandraSink()) {
-            Exception cause = new RuntimeException();
-            casSinkFunc.enqueueCompletableFuture(FutureUtils.completedExceptionally(cause));
-
-            casSinkFunc.invoke("hello");
-
-            try {
-                casSinkFunc.invoke("world");
-                fail("Sending of second value should have failed.");
-            } catch (IOException e) {
-                assertThat(e.getCause()).isEqualTo(cause);
-                assertThat(casSinkFunc.getAcquiredPermits()).isZero();
-            }
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testIgnoreError() throws Exception {
-        Exception cause = new RuntimeException();
-        CassandraFailureHandler failureHandler = failure -> assertThat(failure).isEqualTo(cause);
-
-        try (TestCassandraSink casSinkFunc = createOpenedTestCassandraSink(failureHandler)) {
-
-            casSinkFunc.enqueueCompletableFuture(FutureUtils.completedExceptionally(cause));
-            casSinkFunc.enqueueCompletableFuture(FutureUtils.completedExceptionally(cause));
-
-            casSinkFunc.invoke("hello");
-            casSinkFunc.invoke("world");
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testThrowErrorOnSnapshot() throws Exception {
-        TestCassandraSink casSinkFunc = new TestCassandraSink();
-
-        try (OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                createOpenedTestHarness(casSinkFunc)) {
-            Exception cause = new RuntimeException();
-            casSinkFunc.enqueueCompletableFuture(FutureUtils.completedExceptionally(cause));
-
-            casSinkFunc.invoke("hello");
-            assertThatThrownBy(() -> testHarness.snapshot(123L, 123L))
-                    .hasCauseInstanceOf(IOException.class);
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testWaitForPendingUpdatesOnSnapshot() throws Exception {
-        final TestCassandraSink casSinkFunc = new TestCassandraSink();
-
-        try (OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                createOpenedTestHarness(casSinkFunc)) {
-            CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
-            casSinkFunc.enqueueCompletableFuture(completableFuture);
-
-            casSinkFunc.invoke("hello");
-            assertThat(casSinkFunc.getAcquiredPermits()).isOne();
-
-            final CountDownLatch latch = new CountDownLatch(1);
-            Thread t =
-                    new CheckedThread("Flink-CassandraSinkBaseTest") {
-                        @Override
-                        public void go() throws Exception {
-                            testHarness.snapshot(123L, 123L);
-                            latch.countDown();
-                        }
-                    };
-            t.start();
-            while (t.getState() != Thread.State.TIMED_WAITING) {
-                Thread.sleep(5);
-            }
-
-            assertThat(casSinkFunc.getAcquiredPermits()).isOne();
-            completableFuture.complete(null);
-            latch.await();
-            assertThat(casSinkFunc.getAcquiredPermits()).isZero();
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testWaitForPendingUpdatesOnClose() throws Exception {
-        TestCassandraSink casSinkFunc = new TestCassandraSink();
-
-        try (OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                createOpenedTestHarness(casSinkFunc)) {
-
-            CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
-            casSinkFunc.enqueueCompletableFuture(completableFuture);
-
-            casSinkFunc.invoke("hello");
-            assertThat(casSinkFunc.getAcquiredPermits()).isOne();
-
-            final CountDownLatch latch = new CountDownLatch(1);
-            Thread t =
-                    new CheckedThread("Flink-CassandraSinkBaseTest") {
-                        @Override
-                        public void go() throws Exception {
-                            testHarness.close();
-                            latch.countDown();
-                        }
-                    };
-            t.start();
-            while (t.getState() != Thread.State.TIMED_WAITING) {
-                Thread.sleep(5);
-            }
-
-            assertThat(casSinkFunc.getAcquiredPermits()).isOne();
-            completableFuture.complete(null);
-            latch.await();
-            assertThat(casSinkFunc.getAcquiredPermits()).isZero();
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testReleaseOnSuccess() throws Exception {
-        final CassandraSinkBaseConfig config =
-                CassandraSinkBaseConfig.newBuilder().setMaxConcurrentRequests(1).build();
-
-        try (TestCassandraSink testCassandraSink = createOpenedTestCassandraSink(config)) {
-            assertThat(testCassandraSink.getAvailablePermits()).isOne();
-            assertThat(testCassandraSink.getAcquiredPermits()).isZero();
-
-            CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
-            testCassandraSink.enqueueCompletableFuture(completableFuture);
-            testCassandraSink.invoke("N/A");
-
-            assertThat(testCassandraSink.getAvailablePermits()).isZero();
-            assertThat(testCassandraSink.getAcquiredPermits()).isOne();
-
-            completableFuture.complete(null);
-
-            assertThat(testCassandraSink.getAvailablePermits()).isOne();
-            assertThat(testCassandraSink.getAcquiredPermits()).isZero();
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testReleaseOnFailure() throws Exception {
-        final CassandraSinkBaseConfig config =
-                CassandraSinkBaseConfig.newBuilder().setMaxConcurrentRequests(1).build();
-        final CassandraFailureHandler failureHandler = ignored -> {};
-
-        try (TestCassandraSink testCassandraSink =
-                createOpenedTestCassandraSink(config, failureHandler)) {
-            assertThat(testCassandraSink.getAvailablePermits()).isOne();
-            assertThat(testCassandraSink.getAcquiredPermits()).isZero();
-
-            CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
-            testCassandraSink.enqueueCompletableFuture(completableFuture);
-            testCassandraSink.invoke("N/A");
-
-            assertThat(testCassandraSink.getAvailablePermits()).isZero();
-            assertThat(testCassandraSink.getAcquiredPermits()).isOne();
-
-            completableFuture.completeExceptionally(new RuntimeException());
-
-            assertThat(testCassandraSink.getAvailablePermits()).isOne();
-            assertThat(testCassandraSink.getAcquiredPermits()).isZero();
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testReleaseOnThrowingSend() throws Exception {
-        final CassandraSinkBaseConfig config =
-                CassandraSinkBaseConfig.newBuilder().setMaxConcurrentRequests(1).build();
-
-        Function<String, ListenableFuture<ResultSet>> failingSendFunction =
-                ignoredMessage -> {
-                    throwCheckedAsUnchecked(new Throwable("expected"));
-                    //noinspection ReturnOfNull
-                    return null;
-                };
-
-        try (TestCassandraSink testCassandraSink =
-                new MockCassandraSink(config, failingSendFunction)) {
-            testCassandraSink.open(new Configuration());
-            assertThat(testCassandraSink.getAvailablePermits()).isOne();
-            assertThat(testCassandraSink.getAcquiredPermits()).isZero();
-
-            //noinspection OverlyBroadCatchBlock,NestedTryStatement
-            try {
-                testCassandraSink.invoke("none");
-            } catch (Throwable e) {
-                assertThat(e).isInstanceOf(Throwable.class);
-                assertThat(testCassandraSink.getAvailablePermits()).isOne();
-                assertThat(testCassandraSink.getAcquiredPermits()).isZero();
-            }
-        }
-    }
-
-    @Test
-    @Timeout(value = DEFAULT_TEST_TIMEOUT_IN_MILLIS, unit = TimeUnit.MILLISECONDS)
-    void testTimeoutExceptionOnInvoke() throws Exception {
-        final CassandraSinkBaseConfig config =
-                CassandraSinkBaseConfig.newBuilder()
-                        .setMaxConcurrentRequests(1)
-                        .setMaxConcurrentRequestsTimeout(Duration.ofMillis(1))
-                        .build();
-
-        try (TestCassandraSink testCassandraSink = createOpenedTestCassandraSink(config)) {
-            CompletableFuture<ResultSet> completableFuture = new CompletableFuture<>();
-            testCassandraSink.enqueueCompletableFuture(completableFuture);
-            testCassandraSink.enqueueCompletableFuture(completableFuture);
-            testCassandraSink.invoke("Invoke #1");
-
-            try {
-                testCassandraSink.invoke("Invoke #2");
-                fail("Sending value should have experienced a TimeoutException");
-            } catch (Exception e) {
-                assertThat(e).isInstanceOf(TimeoutException.class);
-            } finally {
-                completableFuture.complete(null);
-            }
-        }
-    }
-
-    private TestCassandraSink createOpenedTestCassandraSink() {
-        final TestCassandraSink testCassandraSink = new TestCassandraSink();
-        testCassandraSink.open(new Configuration());
-        return testCassandraSink;
-    }
-
-    private TestCassandraSink createOpenedTestCassandraSink(
-            CassandraFailureHandler failureHandler) {
-        final TestCassandraSink testCassandraSink = new TestCassandraSink(failureHandler);
-        testCassandraSink.open(new Configuration());
-        return testCassandraSink;
-    }
-
-    private TestCassandraSink createOpenedTestCassandraSink(CassandraSinkBaseConfig config) {
-        final TestCassandraSink testCassandraSink = new TestCassandraSink(config);
-        testCassandraSink.open(new Configuration());
-        return testCassandraSink;
-    }
-
-    private TestCassandraSink createOpenedTestCassandraSink(
-            CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler) {
-        final TestCassandraSink testCassandraSink = new TestCassandraSink(config, failureHandler);
-        testCassandraSink.open(new Configuration());
-        return testCassandraSink;
-    }
-
-    private OneInputStreamOperatorTestHarness<String, Object> createOpenedTestHarness(
-            TestCassandraSink testCassandraSink) throws Exception {
-        final StreamSink<String> testStreamSink = new StreamSink<>(testCassandraSink);
-        final OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                new OneInputStreamOperatorTestHarness<>(testStreamSink);
-        testHarness.open();
-        return testHarness;
-    }
-
-    private static <T extends Throwable> void throwCheckedAsUnchecked(Throwable ex) throws T {
-        //noinspection unchecked
-        throw (T) ex;
-    }
-
-    private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet>
-            implements AutoCloseable {
-
-        private static final ClusterBuilder builder;
-        private static final Cluster cluster;
-        private static final Session session;
-
-        static {
-            cluster = mock(Cluster.class);
-
-            session = mock(Session.class);
-            when(cluster.connect()).thenReturn(session);
-
-            builder =
-                    new ClusterBuilder() {
-                        @Override
-                        protected Cluster buildCluster(Cluster.Builder builder) {
-                            return cluster;
-                        }
-                    };
-        }
-
-        private final Queue<ListenableFuture<ResultSet>> resultSetFutures = new LinkedList<>();
-
-        TestCassandraSink() {
-            this(CassandraSinkBaseConfig.newBuilder().build());
-        }
-
-        TestCassandraSink(CassandraSinkBaseConfig config) {
-            this(config, new NoOpCassandraFailureHandler());
-        }
-
-        TestCassandraSink(CassandraFailureHandler failureHandler) {
-            this(CassandraSinkBaseConfig.newBuilder().build(), failureHandler);
-        }
-
-        TestCassandraSink(CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler) {
-            super(builder, config, failureHandler);
-        }
-
-        @Override
-        public ListenableFuture<ResultSet> send(String value) {
-            return resultSetFutures.poll();
-        }
-
-        void enqueueCompletableFuture(CompletableFuture<ResultSet> completableFuture) {
-            Preconditions.checkNotNull(completableFuture);
-            resultSetFutures.offer(ResultSetFutures.fromCompletableFuture(completableFuture));
-        }
-    }
-
-    private static class MockCassandraSink extends TestCassandraSink {
-        private static final long serialVersionUID = -3363195776692829911L;
-
-        private final Function<String, ListenableFuture<ResultSet>> sendFunction;
-
-        MockCassandraSink(
-                CassandraSinkBaseConfig config,
-                Function<String, ListenableFuture<ResultSet>> sendFunction) {
-            super(config, new NoOpCassandraFailureHandler());
-            this.sendFunction = sendFunction;
-        }
-
-        @Override
-        public ListenableFuture<ResultSet> send(String value) {
-            return this.sendFunction.apply(value);
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
deleted file mode 100644
index 0ca238f4ba4..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.mockito.ArgumentMatchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Collections;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/** Tests for the {@link CassandraTupleWriteAheadSink}. */
-class CassandraTupleWriteAheadSinkTest {
-
-    @Test
-    @Timeout(value = 20_000, unit = TimeUnit.MILLISECONDS)
-    void testAckLoopExitOnException() throws Exception {
-        final AtomicReference<Runnable> runnableFuture = new AtomicReference<>();
-
-        final ClusterBuilder clusterBuilder =
-                new ClusterBuilder() {
-                    private static final long serialVersionUID = 4624400760492936756L;
-
-                    @Override
-                    protected Cluster buildCluster(Cluster.Builder builder) {
-                        try {
-                            BoundStatement boundStatement = mock(BoundStatement.class);
-                            when(boundStatement.setDefaultTimestamp(any(long.class)))
-                                    .thenReturn(boundStatement);
-
-                            PreparedStatement preparedStatement = mock(PreparedStatement.class);
-                            when(preparedStatement.bind(ArgumentMatchers.any()))
-                                    .thenReturn(boundStatement);
-
-                            ResultSetFuture future = mock(ResultSetFuture.class);
-                            when(future.get())
-                                    .thenThrow(new RuntimeException("Expected exception."));
-
-                            doAnswer(
-                                            new Answer<Void>() {
-                                                @Override
-                                                public Void answer(
-                                                        InvocationOnMock invocationOnMock)
-                                                        throws Throwable {
-                                                    synchronized (runnableFuture) {
-                                                        runnableFuture.set(
-                                                                (((Runnable)
-                                                                        invocationOnMock
-                                                                                .getArguments()[
-                                                                                0])));
-                                                        runnableFuture.notifyAll();
-                                                    }
-                                                    return null;
-                                                }
-                                            })
-                                    .when(future)
-                                    .addListener(any(Runnable.class), any(Executor.class));
-
-                            Session session = mock(Session.class);
-                            when(session.prepare(anyString())).thenReturn(preparedStatement);
-                            when(session.executeAsync(any(BoundStatement.class)))
-                                    .thenReturn(future);
-
-                            Cluster cluster = mock(Cluster.class);
-                            when(cluster.connect()).thenReturn(session);
-                            return cluster;
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                };
-
-        // Our asynchronous executor thread
-        new Thread(
-                        new Runnable() {
-                            @Override
-                            public void run() {
-                                synchronized (runnableFuture) {
-                                    while (runnableFuture.get() == null) {
-                                        try {
-                                            runnableFuture.wait();
-                                        } catch (InterruptedException e) {
-                                            // ignore interrupts
-                                        }
-                                    }
-                                }
-                                runnableFuture.get().run();
-                            }
-                        })
-                .start();
-
-        CheckpointCommitter cc = mock(CheckpointCommitter.class);
-        final CassandraTupleWriteAheadSink<Tuple0> sink =
-                new CassandraTupleWriteAheadSink<>(
-                        "abc",
-                        TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()),
-                        clusterBuilder,
-                        cc);
-
-        OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness =
-                new OneInputStreamOperatorTestHarness<>(sink);
-        harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true);
-
-        harness.setup();
-        sink.open();
-
-        // we should leave the loop and return false since we've seen an exception
-        assertThat(sink.sendValues(Collections.singleton(new Tuple0()), 1L, 0L)).isFalse();
-
-        sink.close();
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
deleted file mode 100644
index 2efde660aa6..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.annotations.Column;
-
-import java.io.Serializable;
-
-/** Test Pojo with DataStax annotations created dynamically. */
-public class Pojo implements Serializable {
-
-    private static final long serialVersionUID = 1038054554690916991L;
-
-    @Column(name = "id")
-    private String id;
-
-    @Column(name = "counter")
-    private int counter;
-
-    @Column(name = "batch_id")
-    private int batchID;
-
-    // required for deserialization
-    public Pojo() {}
-
-    public Pojo(String id, int counter, int batchID) {
-        this.id = id;
-        this.counter = counter;
-        this.batchID = batchID;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public int getCounter() {
-        return counter;
-    }
-
-    public void setCounter(int counter) {
-        this.counter = counter;
-    }
-
-    public int getBatchID() {
-        return batchID;
-    }
-
-    public void setBatchID(int batchId) {
-        this.batchID = batchId;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
deleted file mode 100644
index f2981064331..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import com.datastax.driver.mapping.Mapper;
-
-import java.util.ArrayList;
-
-/**
- * This is an example showing the to use the Pojo Cassandra Sink in the Streaming API.
- *
- * <p>Pojo's have to be annotated with datastax annotations to work with this sink.
- *
- * <p>The example assumes that a table exists in a local cassandra database, according to the
- * following queries: CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class':
- * 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE IF NOT EXISTS test.message(body txt
- * PRIMARY KEY)
- */
-public class CassandraPojoSinkExample {
-    private static final ArrayList<Message> messages = new ArrayList<>(20);
-
-    static {
-        for (long i = 0; i < 20; i++) {
-            messages.add(new Message("cassandra-" + i));
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        DataStreamSource<Message> source = env.fromCollection(messages);
-
-        CassandraSink.addSink(source)
-                .setClusterBuilder(
-                        new ClusterBuilder() {
-                            @Override
-                            protected Cluster buildCluster(Builder builder) {
-                                return builder.addContactPoint("127.0.0.1").build();
-                            }
-                        })
-                .setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
-                .build();
-
-        env.execute("Cassandra Sink example");
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
deleted file mode 100644
index da5706eb5d8..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-
-import java.util.ArrayList;
-
-/**
- * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API.
- *
- * <p>The example assumes that a table exists in a local cassandra database, according to the
- * following queries: CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class':
- * 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE IF NOT EXISTS test.writetuple(element1
- * text PRIMARY KEY, element2 int)
- */
-public class CassandraTupleSinkExample {
-    private static final String INSERT =
-            "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)";
-    private static final ArrayList<Tuple2<String, Integer>> collection = new ArrayList<>(20);
-
-    static {
-        for (int i = 0; i < 20; i++) {
-            collection.add(new Tuple2<>("cassandra-" + i, i));
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);
-
-        CassandraSink.addSink(source)
-                .setQuery(INSERT)
-                .setClusterBuilder(
-                        new ClusterBuilder() {
-                            @Override
-                            protected Cluster buildCluster(Builder builder) {
-                                return builder.addContactPoint("127.0.0.1").build();
-                            }
-                        })
-                .build();
-
-        env.execute("WriteTupleIntoCassandra");
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
deleted file mode 100644
index f9d616b026c..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-
-import com.datastax.driver.core.Cluster;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * This is an example showing the to use the Cassandra Sink (with write-ahead log) in the Streaming
- * API.
- *
- * <p>The example assumes that a table exists in a local cassandra database, according to the
- * following queries: CREATE KEYSPACE IF NOT EXISTS example WITH replication = {'class':
- * 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE example.values (id text, count int,
- * PRIMARY KEY(id));
- *
- * <p>Important things to note are that checkpointing is enabled, a StateBackend is set and the
- * enableWriteAheadLog() call when creating the CassandraSink.
- */
-public class CassandraTupleWriteAheadSinkExample {
-    public static void main(String[] args) throws Exception {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
-        env.setStateBackend(
-                new FsStateBackend(
-                        "file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend"));
-
-        CassandraSink<Tuple2<String, Integer>> sink =
-                CassandraSink.addSink(env.addSource(new MySource()))
-                        .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-                        .enableWriteAheadLog()
-                        .setClusterBuilder(
-                                new ClusterBuilder() {
-
-                                    private static final long serialVersionUID =
-                                            2793938419775311824L;
-
-                                    @Override
-                                    public Cluster buildCluster(Cluster.Builder builder) {
-                                        return builder.addContactPoint("127.0.0.1").build();
-                                    }
-                                })
-                        .build();
-
-        sink.name("Cassandra Sink").disableChaining().setParallelism(1).uid("hello");
-
-        env.execute();
-    }
-
-    private static class MySource
-            implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
-        private static final long serialVersionUID = 4022367939215095610L;
-
-        private int counter = 0;
-        private boolean stop = false;
-
-        @Override
-        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
-            while (!stop) {
-                Thread.sleep(50);
-                ctx.collect(new Tuple2<>("" + UUID.randomUUID(), 1));
-                counter++;
-                if (counter == 100) {
-                    stop = true;
-                }
-            }
-        }
-
-        @Override
-        public void cancel() {
-            stop = true;
-        }
-
-        @Override
-        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-            return Collections.singletonList(this.counter);
-        }
-
-        @Override
-        public void restoreState(List<Integer> state) throws Exception {
-            if (state.isEmpty() || state.size() > 1) {
-                throw new RuntimeException(
-                        "Test failed due to unexpected recovered state size " + state.size());
-            }
-            this.counter = state.get(0);
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
deleted file mode 100644
index b9103f6a88c..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra.example;
-
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
-
-import java.io.Serializable;
-
-/** Pojo with DataStax annotations.. */
-@Table(keyspace = "test", name = "message")
-public class Message implements Serializable {
-
-    private static final long serialVersionUID = 1123119384361005680L;
-
-    @Column(name = "body")
-    private String message;
-
-    public Message() {
-        this(null);
-    }
-
-    public Message(String word) {
-        this.message = word;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String word) {
-        this.message = word;
-    }
-
-    public boolean equals(Object other) {
-        if (other instanceof Message) {
-            Message that = (Message) other;
-            return this.message.equals(that.message);
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return message.hashCode();
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-connectors/flink-connector-cassandra/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
deleted file mode 100644
index 28999133c2b..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.util.TestLoggerExtension
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-cassandra/src/test/resources/archunit.properties b/flink-connectors/flink-connector-cassandra/src/test/resources/archunit.properties
deleted file mode 100644
index 15be88c95ba..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/resources/archunit.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# By default we allow removing existing violations, but fail when new violations are added.
-freeze.store.default.allowStoreUpdate=true
-
-# Enable this if a new (frozen) rule has been added in order to create the initial store and record the existing violations.
-#freeze.store.default.allowStoreCreation=true
-
-# Enable this to add allow new violations to be recorded.
-# NOTE: Adding new violations should be avoided when possible. If the rule was correct to flag a new
-#       violation, please try to avoid creating the violation. If the violation was created due to a
-#       shortcoming of the rule, file a JIRA issue so the rule can be improved.
-#freeze.refreeze=true
-
-freeze.store.default.path=archunit-violations
diff --git a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-cassandra/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 835c2ec9a3d..00000000000
--- a/flink-connectors/flink-connector-cassandra/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-rootLogger.level = OFF
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.testlogger.name = TestLogger
-appender.testlogger.type = CONSOLE
-appender.testlogger.target = SYSTEM_ERR
-appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index f05ff1e1b7c..6489d56741e 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -42,7 +42,6 @@ under the License.
 		<module>flink-connector-hive</module>
 		<module>flink-connector-jdbc</module>
 		<module>flink-connector-rabbitmq</module>
-		<module>flink-connector-cassandra</module>
 		<module>flink-connector-kafka</module>
 		<module>flink-connector-gcp-pubsub</module>
 		<module>flink-connector-aws-base</module>
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 90f5217d42a..a5850c5c05c 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -330,7 +330,7 @@ under the License.
 			<!-- Indirectly accessed in pyflink_gateway_server -->
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
+			<version>3.0.0-1.16</version>
 			<scope>test</scope>
 		</dependency>
 
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index d2337767f29..434cf4998e4 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -40,8 +40,6 @@ public class DockerImageVersions {
 
     public static final String PULSAR = "apachepulsar/pulsar:2.10.2";
 
-    public static final String CASSANDRA_4_0 = "cassandra:4.0.3";
-
     public static final String MINIO = "minio/minio:RELEASE.2022-02-07T08-17-33Z";
 
     public static final String ZOOKEEPER = "zookeeper:3.4.14";
diff --git a/tools/azure-pipelines/cache_docker_images.sh b/tools/azure-pipelines/cache_docker_images.sh
index 6a6ccbd99cb..348c6412d93 100755
--- a/tools/azure-pipelines/cache_docker_images.sh
+++ b/tools/azure-pipelines/cache_docker_images.sh
@@ -28,7 +28,7 @@ then
 fi
 
 # This is the pattern that determines which containers we save.
-DOCKER_IMAGE_CACHE_PATTERN="testcontainers|kafka|postgres|mysql|pulsar|cassandra|schema-registry"
+DOCKER_IMAGE_CACHE_PATTERN="testcontainers|kafka|postgres|mysql|pulsar|schema-registry"
 
 # The path to the tar file that will contain the saved docker images.
 DOCKER_IMAGES_CACHE_PATH="${DOCKER_IMAGES_CACHE_FOLDER}/cache.tar"
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 860da6b8881..c94e0377226 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -126,7 +126,6 @@ flink-connectors/flink-hadoop-compatibility,\
 flink-connectors,\
 flink-connectors/flink-connector-files,\
 flink-connectors/flink-connector-jdbc,\
-flink-connectors/flink-connector-cassandra,\
 flink-metrics/flink-metrics-dropwizard,\
 flink-metrics/flink-metrics-graphite,\
 flink-metrics/flink-metrics-jmx,\


[flink] 01/02: [hotfix][python] Depend on externalized ES connector

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7bb75ab6d9603713c8a3e20295e3d1f551816603
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Dec 6 11:32:20 2022 +0100

    [hotfix][python] Depend on externalized ES connector
---
 flink-python/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 7ac4959ff94..90f5217d42a 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -282,7 +282,7 @@ under the License.
 			<!-- Indirectly accessed in pyflink_gateway_server -->
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-sql-connector-elasticsearch7</artifactId>
-			<version>1.16.0</version>
+			<version>3.0.0-1.16</version>
 			<scope>test</scope>
 		</dependency>