You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/09 14:29:16 UTC

[flink] branch release-1.11 updated: [FLINK-18534][kafka][table] Fix unstable KafkaTableITCase.testKafkaDebeziumChangelogSource

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 9247610  [FLINK-18534][kafka][table] Fix unstable KafkaTableITCase.testKafkaDebeziumChangelogSource
9247610 is described below

commit 9247610ebe56bd5d1187ad2e9dc3afe3c93b584e
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jul 9 11:53:20 2020 +0800

    [FLINK-18534][kafka][table] Fix unstable KafkaTableITCase.testKafkaDebeziumChangelogSource
    
    This fix the unstable test with "Topic 'changelog_topic' already exists" exception
---
 ...eITCase.java => KafkaChangelogTableITCase.java} |  50 ++++---
 .../connectors/kafka/table/KafkaTableITCase.java   | 155 ---------------------
 2 files changed, 31 insertions(+), 174 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
similarity index 84%
copy from flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
copy to flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 17fbe48..7b9b069 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -18,16 +18,21 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
@@ -38,30 +43,37 @@ import java.nio.file.Path;
 import java.util.List;
 import java.util.Properties;
 
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.isCausedByJobFinished;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
 /**
- * IT cases for Kafka for Table API & SQL.
+ * IT cases for Kafka with changelog format for Table API & SQL.
  */
-public class KafkaTableITCase extends KafkaTableTestBase {
-
-	@Override
-	public String factoryIdentifier() {
-		return KafkaDynamicTableFactory.IDENTIFIER;
-	}
-
-	@Override
-	public String kafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
+public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink {
+
+	protected StreamExecutionEnvironment env;
+	protected StreamTableEnvironment tEnv;
+
+	@Before
+	public void setup() {
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
+		tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance()
+				// Watermark is only supported in blink planner
+				.useBlinkPlanner()
+				.inStreamingMode()
+				.build()
+		);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		// we have to use single parallelism,
+		// because we will count the messages in sink to terminate the job
+		env.setParallelism(1);
 	}
 
 	@Test
 	public void testKafkaDebeziumChangelogSource() throws Exception {
-		if (isLegacyConnector) {
-			// skip tests for legacy connector, because changelog source is only supported in new connector
-			return;
-		}
 		final String topic = "changelog_topic";
 		createTestTopic(topic, 1, 1);
 
@@ -92,12 +104,12 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 			" description STRING," +
 			" weight DECIMAL(10,3)" +
 			") WITH (" +
-			" 'connector' = '%s'," +
+			" 'connector' = 'kafka'," +
 			" 'topic' = '%s'," +
 			" 'properties.bootstrap.servers' = '%s'," +
 			" 'scan.startup.mode' = 'earliest-offset'," +
 			" 'format' = 'debezium-json'" +
-			")", factoryIdentifier(), topic, bootstraps);
+			")", topic, bootstraps);
 		String sinkDDL = "CREATE TABLE sink (" +
 			" name STRING," +
 			" weightSum DECIMAL(10,3)," +
@@ -184,7 +196,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 	// --------------------------------------------------------------------------------------------
 
 	private static List<String> readLines(String resource) throws IOException {
-		final URL url = KafkaTableITCase.class.getClassLoader().getResource(resource);
+		final URL url = KafkaChangelogTableITCase.class.getClassLoader().getResource(resource);
 		assert url != null;
 		Path path = new File(url.getFile()).toPath();
 		return Files.readAllLines(path);
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 17fbe48..58205dd 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -18,28 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Properties;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
 
 /**
  * IT cases for Kafka for Table API & SQL.
@@ -55,138 +34,4 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 	public String kafkaVersion() {
 		return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
 	}
-
-	@Test
-	public void testKafkaDebeziumChangelogSource() throws Exception {
-		if (isLegacyConnector) {
-			// skip tests for legacy connector, because changelog source is only supported in new connector
-			return;
-		}
-		final String topic = "changelog_topic";
-		createTestTopic(topic, 1, 1);
-
-		// ---------- Write the Debezium json into Kafka -------------------
-		List<String> lines = readLines("debezium-data-schema-exclude.txt");
-		DataStreamSource<String> stream = env.fromCollection(lines);
-		SerializationSchema<String> serSchema = new SimpleStringSchema();
-		FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
-		// the producer must not produce duplicates
-		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-		producerProperties.setProperty("retries", "0");
-		producerProperties.putAll(secureProps);
-		kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProperties, partitioner);
-		try {
-			env.execute("Write sequence");
-		}
-		catch (Exception e) {
-			throw new Exception("Failed to write debezium data to Kafka.", e);
-		}
-
-		// ---------- Produce an event time stream into Kafka -------------------
-		String bootstraps = standardProps.getProperty("bootstrap.servers");
-		String sourceDDL = String.format(
-			"CREATE TABLE debezium_source (" +
-			" id INT NOT NULL," +
-			" name STRING," +
-			" description STRING," +
-			" weight DECIMAL(10,3)" +
-			") WITH (" +
-			" 'connector' = '%s'," +
-			" 'topic' = '%s'," +
-			" 'properties.bootstrap.servers' = '%s'," +
-			" 'scan.startup.mode' = 'earliest-offset'," +
-			" 'format' = 'debezium-json'" +
-			")", factoryIdentifier(), topic, bootstraps);
-		String sinkDDL = "CREATE TABLE sink (" +
-			" name STRING," +
-			" weightSum DECIMAL(10,3)," +
-			" PRIMARY KEY (name) NOT ENFORCED" +
-			") WITH (" +
-			" 'connector' = 'values'," +
-			" 'sink-insert-only' = 'false'," +
-			" 'sink-expected-messages-num' = '20'" +
-			")";
-		tEnv.executeSql(sourceDDL);
-		tEnv.executeSql(sinkDDL);
-
-		try {
-			TableEnvUtil.execInsertSqlAndWaitResult(
-				tEnv,
-				"INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
-		} catch (Throwable t) {
-			// we have to use a specific exception to indicate the job is finished,
-			// because the registered Kafka source is infinite.
-			if (!isCausedByJobFinished(t)) {
-				// re-throw
-				throw t;
-			}
-		}
-
-		// Debezium captures change data on the `products` table:
-		//
-		// CREATE TABLE products (
-		//  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
-		//  name VARCHAR(255),
-		//  description VARCHAR(512),
-		//  weight FLOAT
-		// );
-		// ALTER TABLE products AUTO_INCREMENT = 101;
-		//
-		// INSERT INTO products
-		// VALUES (default,"scooter","Small 2-wheel scooter",3.14),
-		//        (default,"car battery","12V car battery",8.1),
-		//        (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
-		//        (default,"hammer","12oz carpenter's hammer",0.75),
-		//        (default,"hammer","14oz carpenter's hammer",0.875),
-		//        (default,"hammer","16oz carpenter's hammer",1.0),
-		//        (default,"rocks","box of assorted rocks",5.3),
-		//        (default,"jacket","water resistent black wind breaker",0.1),
-		//        (default,"spare tire","24 inch spare tire",22.2);
-		// UPDATE products SET description='18oz carpenter hammer' WHERE id=106;
-		// UPDATE products SET weight='5.1' WHERE id=107;
-		// INSERT INTO products VALUES (default,"jacket","water resistent white wind breaker",0.2);
-		// INSERT INTO products VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
-		// UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
-		// UPDATE products SET weight='5.17' WHERE id=111;
-		// DELETE FROM products WHERE id=111;
-		//
-		// > SELECT * FROM products;
-		// +-----+--------------------+---------------------------------------------------------+--------+
-		// | id  | name               | description                                             | weight |
-		// +-----+--------------------+---------------------------------------------------------+--------+
-		// | 101 | scooter            | Small 2-wheel scooter                                   |   3.14 |
-		// | 102 | car battery        | 12V car battery                                         |    8.1 |
-		// | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |    0.8 |
-		// | 104 | hammer             | 12oz carpenter's hammer                                 |   0.75 |
-		// | 105 | hammer             | 14oz carpenter's hammer                                 |  0.875 |
-		// | 106 | hammer             | 18oz carpenter hammer                                   |      1 |
-		// | 107 | rocks              | box of assorted rocks                                   |    5.1 |
-		// | 108 | jacket             | water resistent black wind breaker                      |    0.1 |
-		// | 109 | spare tire         | 24 inch spare tire                                      |   22.2 |
-		// | 110 | jacket             | new water resistent white wind breaker                  |    0.5 |
-		// +-----+--------------------+---------------------------------------------------------+--------+
-
-		String[] expected = new String[]{
-			"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800",
-			"hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"};
-
-		List<String> actual = TestValuesTableFactory.getResults("sink");
-		assertThat(actual, containsInAnyOrder(expected));
-
-		// ------------- cleanup -------------------
-
-		deleteTestTopic(topic);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Utilities
-	// --------------------------------------------------------------------------------------------
-
-	private static List<String> readLines(String resource) throws IOException {
-		final URL url = KafkaTableITCase.class.getClassLoader().getResource(resource);
-		assert url != null;
-		Path path = new File(url.getFile()).toPath();
-		return Files.readAllLines(path);
-	}
 }