You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/02/22 07:31:13 UTC

[flink] branch release-1.13 updated (b0d0a00 -> 965774c)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b0d0a00  [FLINK-25227][table] Boxed numeric type should be considered when generating code for equality checking
     new 42db40c  [FLINK-25851][build][tests] Bump bytebuddy to 1.8.22
     new 965774c  [FLINK-25851][cassandra][tests] Inject dynamic table name into Pojos

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/CustomCassandraAnnotatedPojo.java    |  70 -----
 .../cassandra/example/BatchPojoExample.java        |  17 +-
 .../connectors/cassandra/example/Pojo.java}        |  13 +-
 .../cassandra/CassandraConnectorITCase.java        | 333 ++++++++++++---------
 .../flink/streaming/connectors/cassandra/Pojo.java |   7 +-
 pom.xml                                            |   4 +-
 6 files changed, 217 insertions(+), 227 deletions(-)
 delete mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
 rename flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/{streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java => batch/connectors/cassandra/example/Pojo.java} (84%)

[flink] 01/02: [FLINK-25851][build][tests] Bump bytebuddy to 1.8.22

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42db40c33eae39518e16640ebdf093594590c1da
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Feb 17 10:16:49 2022 +0100

    [FLINK-25851][build][tests] Bump bytebuddy to 1.8.22
---
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index af10c87..1adc7f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -492,14 +492,14 @@ under the License.
 				<!-- mockito/powermock mismatch -->
 				<groupId>net.bytebuddy</groupId>
 				<artifactId>byte-buddy</artifactId>
-				<version>1.8.15</version>
+				<version>1.8.22</version>
 			</dependency>
 
 			<dependency>
 				<!-- mockito/powermock mismatch -->
 				<groupId>net.bytebuddy</groupId>
 				<artifactId>byte-buddy-agent</artifactId>
-				<version>1.8.15</version>
+				<version>1.8.22</version>
 			</dependency>
 
 			<!-- For dependency convergence -->

[flink] 02/02: [FLINK-25851][cassandra][tests] Inject dynamic table name into Pojos

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 965774ca1c4923d1c5c78bbc5f1a7b26262e6682
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 28 13:05:26 2022 +0100

    [FLINK-25851][cassandra][tests] Inject dynamic table name into Pojos
---
 .../cassandra/CustomCassandraAnnotatedPojo.java    |  70 -----
 .../cassandra/example/BatchPojoExample.java        |  17 +-
 .../connectors/cassandra/example/Pojo.java}        |  13 +-
 .../cassandra/CassandraConnectorITCase.java        | 333 ++++++++++++---------
 .../flink/streaming/connectors/cassandra/Pojo.java |   7 +-
 5 files changed, 215 insertions(+), 225 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
deleted file mode 100644
index a62cda3..0000000
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
-
-/** Example of Cassandra Annotated POJO class for use with {@link CassandraPojoInputFormat}. */
-@Table(name = CustomCassandraAnnotatedPojo.TABLE_NAME, keyspace = "flink")
-public class CustomCassandraAnnotatedPojo {
-
-    public static final String TABLE_NAME = "batches";
-
-    @Column(name = "id")
-    private String id;
-
-    @Column(name = "counter")
-    private Integer counter;
-
-    @Column(name = "batch_id")
-    private Integer batchId;
-
-    /** Necessary for the driver's mapper instanciation. */
-    public CustomCassandraAnnotatedPojo() {}
-
-    public CustomCassandraAnnotatedPojo(String id, Integer counter, Integer batchId) {
-        this.id = id;
-        this.counter = counter;
-        this.batchId = batchId;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public Integer getCounter() {
-        return counter;
-    }
-
-    public void setCounter(Integer counter) {
-        this.counter = counter;
-    }
-
-    public Integer getBatchId() {
-        return batchId;
-    }
-
-    public void setBatchId(Integer batchId) {
-        this.batchId = batchId;
-    }
-}
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
index fb6733e..764c001 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
-import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
 import com.datastax.driver.core.Cluster;
@@ -50,16 +49,12 @@ public class BatchPojoExample {
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
 
-        List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos =
+        List<Pojo> customCassandraAnnotatedPojos =
                 IntStream.range(0, 20)
-                        .mapToObj(
-                                x ->
-                                        new CustomCassandraAnnotatedPojo(
-                                                UUID.randomUUID().toString(), x, 0))
+                        .mapToObj(x -> new Pojo(UUID.randomUUID().toString(), x, 0))
                         .collect(Collectors.toList());
 
-        DataSet<CustomCassandraAnnotatedPojo> dataSet =
-                env.fromCollection(customCassandraAnnotatedPojos);
+        DataSet<Pojo> dataSet = env.fromCollection(customCassandraAnnotatedPojos);
 
         ClusterBuilder clusterBuilder =
                 new ClusterBuilder() {
@@ -74,7 +69,7 @@ public class BatchPojoExample {
         dataSet.output(
                 new CassandraPojoOutputFormat<>(
                         clusterBuilder,
-                        CustomCassandraAnnotatedPojo.class,
+                        Pojo.class,
                         () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}));
 
         env.execute("Write");
@@ -82,12 +77,12 @@ public class BatchPojoExample {
         /*
          *	This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat.
          */
-        DataSet<CustomCassandraAnnotatedPojo> inputDS =
+        DataSet<Pojo> inputDS =
                 env.createInput(
                         new CassandraPojoInputFormat<>(
                                 SELECT_QUERY,
                                 clusterBuilder,
-                                CustomCassandraAnnotatedPojo.class,
+                                Pojo.class,
                                 () ->
                                         new Mapper.Option[] {
                                             Mapper.Option.consistencyLevel(ConsistencyLevel.ANY)
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
similarity index 84%
rename from flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
rename to flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
index aa74310..559f107 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/PojoNoAnnotatedKeyspace.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.cassandra;
+package org.apache.flink.batch.connectors.cassandra.example;
 
 import com.datastax.driver.mapping.annotations.Column;
 import com.datastax.driver.mapping.annotations.Table;
 
 import java.io.Serializable;
 
-/** Test Pojo with DataStax annotations used (no keyspace). */
-@Table(name = "testPojoNoAnnotatedKeyspace")
-public class PojoNoAnnotatedKeyspace implements Serializable {
+/** Test Pojo with DataStax annotations used. */
+@Table(keyspace = "flink", name = "batches")
+public class Pojo implements Serializable {
 
     private static final long serialVersionUID = 1038054554690916991L;
 
@@ -37,7 +37,10 @@ public class PojoNoAnnotatedKeyspace implements Serializable {
     @Column(name = "batch_id")
     private int batchID;
 
-    public PojoNoAnnotatedKeyspace(String id, int counter, int batchID) {
+    // required for deserialization
+    public Pojo() {}
+
+    public Pojo(String id, int counter, int batchID) {
         this.id = id;
         this.counter = counter;
         this.batchID = batchID;
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index a950204..0d33ed7 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -36,7 +36,6 @@ import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
-import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -57,6 +56,10 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.annotations.Table;
+import net.bytebuddy.ByteBuddy;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -73,19 +76,18 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.images.builder.Transferable;
 
 import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
@@ -108,8 +110,6 @@ public class CassandraConnectorITCase
     private static final long CONNECTION_RETRY_DELAY = 500L;
     private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
     private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
-    private static final String TABLE_POJO = "test";
-    private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";
 
     @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
@@ -206,6 +206,61 @@ public class CassandraConnectorITCase
         }
     }
 
+    private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, String tableName) {
+        return new ByteBuddy()
+                .redefine(Pojo.class)
+                .name("org.apache.flink.streaming.connectors.cassandra.Pojo" + tableName)
+                .annotateType(createTableAnnotation(keyspace, tableName))
+                .make()
+                .load(Pojo.class.getClassLoader())
+                .getLoaded();
+    }
+
+    @NotNull
+    private static Table createTableAnnotation(String keyspace, String tableName) {
+        return new Table() {
+
+            @Override
+            public String keyspace() {
+                return keyspace;
+            }
+
+            @Override
+            public String name() {
+                return tableName;
+            }
+
+            @Override
+            public boolean caseSensitiveKeyspace() {
+                return false;
+            }
+
+            @Override
+            public boolean caseSensitiveTable() {
+                return false;
+            }
+
+            @Override
+            public String writeConsistency() {
+                return "";
+            }
+
+            @Override
+            public String readConsistency() {
+                return "";
+            }
+
+            @Override
+            public Class<? extends Annotation> annotationType() {
+                return Table.class;
+            }
+        };
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utility methods
+    // ------------------------------------------------------------------------
+
     public static CassandraContainer createCassandraContainer() {
         CassandraContainer cassandra = new CassandraContainer(CASSANDRA_3);
         cassandra.withJmxReporting(false);
@@ -213,6 +268,87 @@ public class CassandraConnectorITCase
         return cassandra;
     }
 
+    private static void raiseCassandraRequestsTimeouts() {
+        try {
+            final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+            CASSANDRA_CONTAINER.copyFileFromContainer(
+                    "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
+            String configuration =
+                    new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
+            String patchedConfiguration =
+                    configuration
+                            .replaceAll(
+                                    "request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "read_request_timeout_in_ms: [0-9]+",
+                                    "read_request_timeout_in_ms: 15000")
+                            .replaceAll(
+                                    "write_request_timeout_in_ms: [0-9]+",
+                                    "write_request_timeout_in_ms: 6000");
+            CASSANDRA_CONTAINER.copyFileToContainer(
+                    Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
+                    "/etc/cassandra/cassandra.yaml");
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to open Cassandra configuration file ", e);
+        }
+    }
+
+    private <T> List<T> readPojosWithInputFormat(Class<T> annotatedPojoClass) {
+        final CassandraPojoInputFormat<T> source =
+                new CassandraPojoInputFormat<>(
+                        injectTableName(SELECT_DATA_QUERY), builderForReading, annotatedPojoClass);
+        List<T> result = new ArrayList<>();
+
+        try {
+            source.configure(new Configuration());
+            source.open(null);
+            while (!source.reachedEnd()) {
+                T temp = source.nextRecord(null);
+                result.add(temp);
+            }
+        } finally {
+            source.close();
+        }
+        return result;
+    }
+
+    private <T> List<T> writePojosWithOutputFormat(Class<T> annotatedPojoClass) throws Exception {
+        final CassandraPojoOutputFormat<T> sink =
+                new CassandraPojoOutputFormat<>(
+                        builderForWriting,
+                        annotatedPojoClass,
+                        () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)});
+
+        final Constructor<T> pojoConstructor = getPojoConstructor(annotatedPojoClass);
+        List<T> pojos = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            pojos.add(pojoConstructor.newInstance(UUID.randomUUID().toString(), i, 0));
+        }
+        try {
+            sink.configure(new Configuration());
+            sink.open(0, 1);
+            for (T pojo : pojos) {
+                sink.writeRecord(pojo);
+            }
+        } finally {
+            sink.close();
+        }
+        return pojos;
+    }
+
+    private <T> Constructor<T> getPojoConstructor(Class<T> annotatedPojoClass)
+            throws NoSuchMethodException {
+        return annotatedPojoClass.getConstructor(String.class, Integer.TYPE, Integer.TYPE);
+    }
+
+    private String injectTableName(String target) {
+        return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Tests initialization
+    // ------------------------------------------------------------------------
+
     @BeforeClass
     public static void startAndInitializeCassandra() {
         raiseCassandraRequestsTimeouts();
@@ -249,46 +385,6 @@ public class CassandraConnectorITCase
                 CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));
     }
 
-    private static void raiseCassandraRequestsTimeouts() {
-        try {
-            final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
-            CASSANDRA_CONTAINER.copyFileFromContainer(
-                    "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
-            String configuration =
-                    new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
-            String patchedConfiguration =
-                    configuration
-                            .replaceAll(
-                                    "request_timeout_in_ms: [0-9]+", "request_timeout_in_ms: 30000")
-                            .replaceAll(
-                                    "read_request_timeout_in_ms: [0-9]+",
-                                    "read_request_timeout_in_ms: 15000")
-                            .replaceAll(
-                                    "write_request_timeout_in_ms: [0-9]+",
-                                    "write_request_timeout_in_ms: 6000");
-            CASSANDRA_CONTAINER.copyFileToContainer(
-                    Transferable.of(patchedConfiguration.getBytes(StandardCharsets.UTF_8)),
-                    "/etc/cassandra/cassandra.yaml");
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to open Cassandra configuration file ", e);
-        }
-    }
-
-    @Test
-    public void testRaiseCassandraRequestsTimeouts() throws IOException {
-        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
-        // do not change the container conf twice, just assert that it was indeed changed in the
-        // container
-        final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
-        CASSANDRA_CONTAINER.copyFileFromContainer(
-                "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
-        final String configuration =
-                new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
-        assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
-        assertTrue(configuration.contains("read_request_timeout_in_ms: 15000"));
-        assertTrue(configuration.contains("write_request_timeout_in_ms: 6000"));
-    }
-
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);
@@ -299,12 +395,8 @@ public class CassandraConnectorITCase
     public void dropTables() {
         // need to drop tables in case of retrials. Need to drop all the tables
         // that are created in test because this method is executed with every test
-        session.execute(
-                DROP_TABLE_QUERY.replace(
-                        TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME));
-        session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
-        session.execute(
-                DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE));
+        session.execute(DROP_KEYSPACE_QUERY);
+        session.execute(CREATE_KEYSPACE_QUERY);
     }
 
     @AfterClass
@@ -319,6 +411,34 @@ public class CassandraConnectorITCase
     }
 
     // ------------------------------------------------------------------------
+    //  Technical Tests
+    // ------------------------------------------------------------------------
+
+    @Test
+    public void testAnnotatePojoWithTable() {
+        final String tableName = TABLE_NAME_PREFIX + tableID;
+
+        final Class<? extends Pojo> annotatedPojoClass = annotatePojoWithTable(KEYSPACE, tableName);
+        final Table pojoTableAnnotation = annotatedPojoClass.getAnnotation(Table.class);
+        assertTrue(pojoTableAnnotation.name().contains(tableName));
+    }
+
+    @Test
+    public void testRaiseCassandraRequestsTimeouts() throws IOException {
+        // raiseCassandraRequestsTimeouts() was already called in @BeforeClass,
+        // do not change the container conf twice, just assert that it was indeed changed in the
+        // container
+        final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+        CASSANDRA_CONTAINER.copyFileFromContainer(
+                "/etc/cassandra/cassandra.yaml", configurationPath.toAbsolutePath().toString());
+        final String configuration =
+                new String(Files.readAllBytes(configurationPath), StandardCharsets.UTF_8);
+        assertTrue(configuration.contains("request_timeout_in_ms: 30000"));
+        assertTrue(configuration.contains("read_request_timeout_in_ms: 15000"));
+        assertTrue(configuration.contains("write_request_timeout_in_ms: 6000"));
+    }
+
+    // ------------------------------------------------------------------------
     //  Exactly-once Tests
     // ------------------------------------------------------------------------
 
@@ -520,43 +640,36 @@ public class CassandraConnectorITCase
 
     @Test
     public void testCassandraPojoAtLeastOnceSink() throws Exception {
-        session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
+        writePojos(annotatedPojoClass, null);
 
-        CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builderForWriting);
-        try {
-            sink.open(new Configuration());
-            for (int x = 0; x < 20; x++) {
-                sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
-            }
-        } finally {
-            sink.close();
-        }
-
-        ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
         Assert.assertEquals(20, rs.all().size());
     }
 
     @Test
     public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception {
-        session.execute(
-                CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE));
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable("", TABLE_NAME_PREFIX + tableID);
+        writePojos(annotatedPojoClass, KEYSPACE);
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
+        Assert.assertEquals(20, rs.all().size());
+    }
 
-        CassandraPojoSink<PojoNoAnnotatedKeyspace> sink =
-                new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builderForWriting, KEYSPACE);
+    private <T> void writePojos(Class<T> annotatedPojoClass, @Nullable String keyspace)
+            throws Exception {
+        final Constructor<T> pojoConstructor = getPojoConstructor(annotatedPojoClass);
+        CassandraPojoSink<T> sink =
+                new CassandraPojoSink<>(annotatedPojoClass, builderForWriting, null, keyspace);
         try {
             sink.open(new Configuration());
             for (int x = 0; x < 20; x++) {
-                sink.send(new PojoNoAnnotatedKeyspace(UUID.randomUUID().toString(), x, 0));
+                sink.send(pojoConstructor.newInstance(UUID.randomUUID().toString(), x, 0));
             }
-
         } finally {
             sink.close();
         }
-        ResultSet rs =
-                session.execute(
-                        SELECT_DATA_QUERY.replace(
-                                TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE));
-        Assert.assertEquals(20, rs.all().size());
     }
 
     @Test
@@ -603,9 +716,9 @@ public class CassandraConnectorITCase
 
     @Test
     public void testRetrialAndDropTables() {
-        session.execute(
-                CREATE_TABLE_QUERY.replace(
-                        TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME));
+        // should not fail with table exists upon retrial
+        // as @After method that truncate the keyspace is called upon retrials.
+        annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
         if (retrialsCount < 2) {
             retrialsCount++;
             throw new NoHostAvailableException(new HashMap<>());
@@ -615,64 +728,16 @@ public class CassandraConnectorITCase
     @Test
     public void testCassandraBatchPojoFormat() throws Exception {
 
-        session.execute(
-                CREATE_TABLE_QUERY.replace(
-                        TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME));
-
-        OutputFormat<CustomCassandraAnnotatedPojo> sink =
-                new CassandraPojoOutputFormat<>(
-                        builderForWriting,
-                        CustomCassandraAnnotatedPojo.class,
-                        () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)});
+        final Class<? extends Pojo> annotatedPojoClass =
+                annotatePojoWithTable(KEYSPACE, TABLE_NAME_PREFIX + tableID);
 
-        List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos =
-                IntStream.range(0, 20)
-                        .mapToObj(
-                                x ->
-                                        new CustomCassandraAnnotatedPojo(
-                                                UUID.randomUUID().toString(), x, 0))
-                        .collect(Collectors.toList());
-        try {
-            sink.configure(new Configuration());
-            sink.open(0, 1);
-            for (CustomCassandraAnnotatedPojo customCassandraAnnotatedPojo :
-                    customCassandraAnnotatedPojos) {
-                sink.writeRecord(customCassandraAnnotatedPojo);
-            }
-        } finally {
-            sink.close();
-        }
-        ResultSet rs =
-                session.execute(
-                        SELECT_DATA_QUERY.replace(
-                                TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME));
+        final List<? extends Pojo> pojos = writePojosWithOutputFormat(annotatedPojoClass);
+        ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));
         Assert.assertEquals(20, rs.all().size());
 
-        InputFormat<CustomCassandraAnnotatedPojo, InputSplit> source =
-                new CassandraPojoInputFormat<>(
-                        SELECT_DATA_QUERY.replace(
-                                TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME),
-                        builderForReading,
-                        CustomCassandraAnnotatedPojo.class);
-        List<CustomCassandraAnnotatedPojo> result = new ArrayList<>();
-
-        try {
-            source.configure(new Configuration());
-            source.open(null);
-            while (!source.reachedEnd()) {
-                CustomCassandraAnnotatedPojo temp = source.nextRecord(null);
-                result.add(temp);
-            }
-        } finally {
-            source.close();
-        }
-
+        final List<? extends Pojo> result = readPojosWithInputFormat(annotatedPojoClass);
         Assert.assertEquals(20, result.size());
-        result.sort(Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
-        customCassandraAnnotatedPojos.sort(
-                Comparator.comparingInt(CustomCassandraAnnotatedPojo::getCounter));
-
-        assertThat(result, samePropertyValuesAs(customCassandraAnnotatedPojos));
+        assertThat(result, samePropertyValuesAs(pojos));
     }
 
     @Test
@@ -738,10 +803,6 @@ public class CassandraConnectorITCase
         Assert.assertEquals(rowCollection.size(), rows.size());
     }
 
-    private String injectTableName(String target) {
-        return target.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID);
-    }
-
     @Test
     public void testCassandraScalaTupleAtLeastOnceSinkBuilderDetection() throws Exception {
         Class<scala.Tuple1<String>> c =
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
index ce2c213..2efde66 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
@@ -18,12 +18,10 @@
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.Table;
 
 import java.io.Serializable;
 
-/** Test Pojo with DataStax annotations used. */
-@Table(keyspace = "flink", name = "test")
+/** Test Pojo with DataStax annotations created dynamically. */
 public class Pojo implements Serializable {
 
     private static final long serialVersionUID = 1038054554690916991L;
@@ -37,6 +35,9 @@ public class Pojo implements Serializable {
     @Column(name = "batch_id")
     private int batchID;
 
+    // required for deserialization
+    public Pojo() {}
+
     public Pojo(String id, int counter, int batchID) {
         this.id = id;
         this.counter = counter;