You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/21 10:50:40 UTC

[GitHub] [flink-connector-cassandra] zentol commented on a diff in pull request #1: [FLINK-29982] move the existing Cassandra connector to external repository

zentol commented on code in PR #1:
URL: https://github.com/apache/flink-connector-cassandra/pull/1#discussion_r1027825022


##########
src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java:
##########
@@ -0,0 +1,912 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.DockerImageVersions;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.annotations.Table;
+import net.bytebuddy.ByteBuddy;
+import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for all cassandra sinks. */
+@SuppressWarnings("serial")
+// NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster
+@RetryOnException(times = 3, exception = NoHostAvailableException.class)
+@Testcontainers
+@ExtendWith(RetryExtension.class)
+class CassandraConnectorITCase
+        extends WriteAheadSinkTestBase<
+                Tuple3<String, Integer, Integer>,
+                CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
+    private static final int MAX_CONNECTION_RETRY = 3;
+    private static final long CONNECTION_RETRY_DELAY = 500L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
+    private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
+
+    @TempDir static Path tmpDir;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    @Container static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer();
+
+    private static final int PORT = 9042;
+
+    private static Cluster cluster;
+    private static Session session;
+
+    private final ClusterBuilder builderForReading =
+            createBuilderWithConsistencyLevel(ConsistencyLevel.ONE);
+    // Lower consistency level ANY is only available for writing.
+    private final ClusterBuilder builderForWriting =
+            createBuilderWithConsistencyLevel(ConsistencyLevel.ANY);
+
+    private ClusterBuilder createBuilderWithConsistencyLevel(ConsistencyLevel consistencyLevel) {
+        return new ClusterBuilder() {
+            @Override
+            protected Cluster buildCluster(Cluster.Builder builder) {
+                return builder.addContactPointsWithPorts(
+                                new InetSocketAddress(
+                                        CASSANDRA_CONTAINER.getHost(),
+                                        CASSANDRA_CONTAINER.getMappedPort(PORT)))
+                        .withQueryOptions(
+                                new QueryOptions()
+                                        .setConsistencyLevel(consistencyLevel)
+                                        .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+                        .withSocketOptions(
+                                new SocketOptions()
+                                        // default timeout x 3
+                                        .setConnectTimeoutMillis(15000)
+                                        // default timeout x3 and higher than
+                                        // request_timeout_in_ms at the cluster level
+                                        .setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
+                        .withoutJMXReporting()
+                        .withoutMetrics()
+                        .build();
+            }
+        };
+    }
+
+    private static final String TABLE_NAME_PREFIX = "flink_";
+    private static final String TABLE_NAME_VARIABLE = "$TABLE";
+    private static final String KEYSPACE = "flink";
+    private static final String TUPLE_ID_FIELD = "id";
+    private static final String TUPLE_COUNTER_FIELD = "counter";
+    private static final String TUPLE_BATCHID_FIELD = "batch_id";
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+    private static final String CREATE_TABLE_QUERY =
+            "CREATE TABLE "
+                    + KEYSPACE
+                    + "."
+                    + TABLE_NAME_VARIABLE
+                    + " ("
+                    + TUPLE_ID_FIELD
+                    + " text PRIMARY KEY, "
+                    + TUPLE_COUNTER_FIELD
+                    + " int, "
+                    + TUPLE_BATCHID_FIELD
+                    + " int);";
+    private static final String INSERT_DATA_QUERY =
+            "INSERT INTO "
+                    + KEYSPACE
+                    + "."
+                    + TABLE_NAME_VARIABLE
+                    + " ("
+                    + TUPLE_ID_FIELD
+                    + ", "
+                    + TUPLE_COUNTER_FIELD
+                    + ", "
+                    + TUPLE_BATCHID_FIELD
+                    + ") VALUES (?, ?, ?)";
+    private static final String SELECT_DATA_QUERY =
+            "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_VARIABLE + ';';
+
+    private static final Random random = new Random();
+    private int tableID;
+
+    private static final ArrayList<Tuple3<String, Integer, Integer>> collection =
+            new ArrayList<>(20);
+    private static final ArrayList<Row> rowCollection = new ArrayList<>(20);
+
+    private static final TypeInformation[] FIELD_TYPES = {
+        BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+    };
+
+    static {
+        for (int i = 0; i < 20; i++) {
+            collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
+            rowCollection.add(Row.of(UUID.randomUUID().toString(), i, 0));
+        }
+    }
+
+    private static Class<? extends Pojo> annotatePojoWithTable(String keyspace, String tableName) {
+        return new ByteBuddy()
+                .redefine(Pojo.class)
+                .name("org.apache.flink.streaming.connectors.cassandra.Pojo" + tableName)
+                .annotateType(createTableAnnotation(keyspace, tableName))
+                .make()
+                .load(Pojo.class.getClassLoader())
+                .getLoaded();
+    }
+
+    @NotNull
+    private static Table createTableAnnotation(String keyspace, String tableName) {
+        return new Table() {
+
+            @Override
+            public String keyspace() {
+                return keyspace;
+            }
+
+            @Override
+            public String name() {
+                return tableName;
+            }
+
+            @Override
+            public boolean caseSensitiveKeyspace() {
+                return false;
+            }
+
+            @Override
+            public boolean caseSensitiveTable() {
+                return false;
+            }
+
+            @Override
+            public String writeConsistency() {
+                return "";
+            }
+
+            @Override
+            public String readConsistency() {
+                return "";
+            }
+
+            @Override
+            public Class<? extends Annotation> annotationType() {
+                return Table.class;
+            }
+        };
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utility methods
+    // ------------------------------------------------------------------------
+
+    public static CassandraContainer createCassandraContainer() {
+        CassandraContainer cassandra = new CassandraContainer(DockerImageVersions.CASSANDRA_4_0);

Review Comment:
   Don't rely on the Flink DockerImageVersions; this entry will be removed eventually.



##########
pom.xml:
##########
@@ -0,0 +1,432 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>io.github.zentol.flink</groupId>
+		<artifactId>flink-connector-parent</artifactId>
+		<version>1.0</version>
+	</parent>
+
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
+	<version>4.0-SNAPSHOT</version>

Review Comment:
   ```suggestion
   	<version>3.0-SNAPSHOT</version>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org