You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/09 14:29:16 UTC
[flink] branch release-1.11 updated: [FLINK-18534][kafka][table]
Fix unstable KafkaTableITCase.testKafkaDebeziumChangelogSource
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 9247610 [FLINK-18534][kafka][table] Fix unstable KafkaTableITCase.testKafkaDebeziumChangelogSource
9247610 is described below
commit 9247610ebe56bd5d1187ad2e9dc3afe3c93b584e
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jul 9 11:53:20 2020 +0800
[FLINK-18534][kafka][table] Fix unstable KafkaTableITCase.testKafkaDebeziumChangelogSource
This fix the unstable test with "Topic 'changelog_topic' already exists" exception
---
...eITCase.java => KafkaChangelogTableITCase.java} | 50 ++++---
.../connectors/kafka/table/KafkaTableITCase.java | 155 ---------------------
2 files changed, 31 insertions(+), 174 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
similarity index 84%
copy from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
copy to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 17fbe48..7b9b069 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -18,16 +18,21 @@
package org.apache.flink.streaming.connectors.kafka.table;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
@@ -38,30 +43,37 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.isCausedByJobFinished;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/**
- * IT cases for Kafka for Table API & SQL.
+ * IT cases for Kafka with changelog format for Table API & SQL.
*/
-public class KafkaTableITCase extends KafkaTableTestBase {
-
- @Override
- public String factoryIdentifier() {
- return KafkaDynamicTableFactory.IDENTIFIER;
- }
-
- @Override
- public String kafkaVersion() {
- return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
+public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
+
+ protected StreamExecutionEnvironment env;
+ protected StreamTableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(
+ env,
+ EnvironmentSettings.newInstance()
+ // Watermark is only supported in blink planner
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build()
+ );
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ // we have to use single parallelism,
+ // because we will count the messages in sink to terminate the job
+ env.setParallelism(1);
}
@Test
public void testKafkaDebeziumChangelogSource() throws Exception {
- if (isLegacyConnector) {
- // skip tests for legacy connector, because changelog source is only supported in new connector
- return;
- }
final String topic = "changelog_topic";
createTestTopic(topic, 1, 1);
@@ -92,12 +104,12 @@ public class KafkaTableITCase extends KafkaTableTestBase {
" description STRING," +
" weight DECIMAL(10,3)" +
") WITH (" +
- " 'connector' = '%s'," +
+ " 'connector' = 'kafka'," +
" 'topic' = '%s'," +
" 'properties.bootstrap.servers' = '%s'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'debezium-json'" +
- ")", factoryIdentifier(), topic, bootstraps);
+ ")", topic, bootstraps);
String sinkDDL = "CREATE TABLE sink (" +
" name STRING," +
" weightSum DECIMAL(10,3)," +
@@ -184,7 +196,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
// --------------------------------------------------------------------------------------------
private static List<String> readLines(String resource) throws IOException {
- final URL url = KafkaTableITCase.class.getClassLoader().getResource(resource);
+ final URL url = KafkaChangelogTableITCase.class.getClassLoader().getResource(resource);
assert url != null;
Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 17fbe48..58205dd 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -18,28 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.table;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Properties;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
/**
* IT cases for Kafka for Table API & SQL.
@@ -55,138 +34,4 @@ public class KafkaTableITCase extends KafkaTableTestBase {
public String kafkaVersion() {
return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
}
-
- @Test
- public void testKafkaDebeziumChangelogSource() throws Exception {
- if (isLegacyConnector) {
- // skip tests for legacy connector, because changelog source is only supported in new connector
- return;
- }
- final String topic = "changelog_topic";
- createTestTopic(topic, 1, 1);
-
- // ---------- Write the Debezium json into Kafka -------------------
- List<String> lines = readLines("debezium-data-schema-exclude.txt");
- DataStreamSource<String> stream = env.fromCollection(lines);
- SerializationSchema<String> serSchema = new SimpleStringSchema();
- FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
- // the producer must not produce duplicates
- Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
- producerProperties.setProperty("retries", "0");
- producerProperties.putAll(secureProps);
- kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProperties, partitioner);
- try {
- env.execute("Write sequence");
- }
- catch (Exception e) {
- throw new Exception("Failed to write debezium data to Kafka.", e);
- }
-
- // ---------- Produce an event time stream into Kafka -------------------
- String bootstraps = standardProps.getProperty("bootstrap.servers");
- String sourceDDL = String.format(
- "CREATE TABLE debezium_source (" +
- " id INT NOT NULL," +
- " name STRING," +
- " description STRING," +
- " weight DECIMAL(10,3)" +
- ") WITH (" +
- " 'connector' = '%s'," +
- " 'topic' = '%s'," +
- " 'properties.bootstrap.servers' = '%s'," +
- " 'scan.startup.mode' = 'earliest-offset'," +
- " 'format' = 'debezium-json'" +
- ")", factoryIdentifier(), topic, bootstraps);
- String sinkDDL = "CREATE TABLE sink (" +
- " name STRING," +
- " weightSum DECIMAL(10,3)," +
- " PRIMARY KEY (name) NOT ENFORCED" +
- ") WITH (" +
- " 'connector' = 'values'," +
- " 'sink-insert-only' = 'false'," +
- " 'sink-expected-messages-num' = '20'" +
- ")";
- tEnv.executeSql(sourceDDL);
- tEnv.executeSql(sinkDDL);
-
- try {
- TableEnvUtil.execInsertSqlAndWaitResult(
- tEnv,
- "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
- } catch (Throwable t) {
- // we have to use a specific exception to indicate the job is finished,
- // because the registered Kafka source is infinite.
- if (!isCausedByJobFinished(t)) {
- // re-throw
- throw t;
- }
- }
-
- // Debezium captures change data on the `products` table:
- //
- // CREATE TABLE products (
- // id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- // name VARCHAR(255),
- // description VARCHAR(512),
- // weight FLOAT
- // );
- // ALTER TABLE products AUTO_INCREMENT = 101;
- //
- // INSERT INTO products
- // VALUES (default,"scooter","Small 2-wheel scooter",3.14),
- // (default,"car battery","12V car battery",8.1),
- // (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
- // (default,"hammer","12oz carpenter's hammer",0.75),
- // (default,"hammer","14oz carpenter's hammer",0.875),
- // (default,"hammer","16oz carpenter's hammer",1.0),
- // (default,"rocks","box of assorted rocks",5.3),
- // (default,"jacket","water resistent black wind breaker",0.1),
- // (default,"spare tire","24 inch spare tire",22.2);
- // UPDATE products SET description='18oz carpenter hammer' WHERE id=106;
- // UPDATE products SET weight='5.1' WHERE id=107;
- // INSERT INTO products VALUES (default,"jacket","water resistent white wind breaker",0.2);
- // INSERT INTO products VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
- // UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
- // UPDATE products SET weight='5.17' WHERE id=111;
- // DELETE FROM products WHERE id=111;
- //
- // > SELECT * FROM products;
- // +-----+--------------------+---------------------------------------------------------+--------+
- // | id | name | description | weight |
- // +-----+--------------------+---------------------------------------------------------+--------+
- // | 101 | scooter | Small 2-wheel scooter | 3.14 |
- // | 102 | car battery | 12V car battery | 8.1 |
- // | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
- // | 104 | hammer | 12oz carpenter's hammer | 0.75 |
- // | 105 | hammer | 14oz carpenter's hammer | 0.875 |
- // | 106 | hammer | 18oz carpenter hammer | 1 |
- // | 107 | rocks | box of assorted rocks | 5.1 |
- // | 108 | jacket | water resistent black wind breaker | 0.1 |
- // | 109 | spare tire | 24 inch spare tire | 22.2 |
- // | 110 | jacket | new water resistent white wind breaker | 0.5 |
- // +-----+--------------------+---------------------------------------------------------+--------+
-
- String[] expected = new String[]{
- "scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800",
- "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"};
-
- List<String> actual = TestValuesTableFactory.getResults("sink");
- assertThat(actual, containsInAnyOrder(expected));
-
- // ------------- cleanup -------------------
-
- deleteTestTopic(topic);
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- private static List<String> readLines(String resource) throws IOException {
- final URL url = KafkaTableITCase.class.getClassLoader().getResource(resource);
- assert url != null;
- Path path = new File(url.getFile()).toPath();
- return Files.readAllLines(path);
- }
}