You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Shengkai Fang (Jira)" <ji...@apache.org> on 2021/06/11 02:53:00 UTC
[jira] [Created] (FLINK-22969) Validate the topic is not null or
empty string when create kafka source/sink function
Shengkai Fang created FLINK-22969:
-------------------------------------
Summary: Validate the topic is not null or empty string when create kafka source/sink function
Key: FLINK-22969
URL: https://issues.apache.org/jira/browse/FLINK-22969
Project: Flink
Issue Type: Bug
Reporter: Shengkai Fang
Add test in UpsertKafkaTableITCase
{code:java}
@Test
public void testSourceSinkWithKeyAndPartialValue() throws Exception {
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "key_partial_value_topic_" + format;
createTestTopic(topic, 1, 1); // use single partition to guarantee orders in tests
// ---------- Produce an event time stream into Kafka -------------------
String bootstraps = standardProps.getProperty("bootstrap.servers");
// k_user_id and user_id have different data types to verify the correct mapping,
// fields are reordered on purpose
final String createTable =
String.format(
"CREATE TABLE upsert_kafka (\n"
+ " `k_user_id` BIGINT,\n"
+ " `name` STRING,\n"
+ " `timestamp` TIMESTAMP(3) METADATA,\n"
+ " `k_event_id` BIGINT,\n"
+ " `user_id` INT,\n"
+ " `payload` STRING,\n"
+ " PRIMARY KEY (k_event_id, k_user_id) NOT ENFORCED"
+ ") WITH (\n"
+ " 'connector' = 'upsert-kafka',\n"
+ " 'topic' = '%s',\n"
+ " 'properties.bootstrap.servers' = '%s',\n"
+ " 'key.format' = '%s',\n"
+ " 'key.fields-prefix' = 'k_',\n"
+ " 'value.format' = '%s',\n"
+ " 'value.fields-include' = 'EXCEPT_KEY'\n"
+ ")",
"", bootstraps, format, format);
tEnv.executeSql(createTable);
String initialValues =
"INSERT INTO upsert_kafka\n"
+ "VALUES\n"
+ " (1, 'name 1', TIMESTAMP '2020-03-08 13:12:11.123', 100, 41, 'payload 1'),\n"
+ " (2, 'name 2', TIMESTAMP '2020-03-09 13:12:11.123', 101, 42, 'payload 2'),\n"
+ " (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 43, 'payload 3'),\n"
+ " (2, 'name 2', TIMESTAMP '2020-03-11 13:12:11.123', 101, 42, 'payload')";
tEnv.executeSql(initialValues).await();
// ---------- Consume stream from Kafka -------------------
final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM upsert_kafka"), 5);
final List<Row> expected =
Arrays.asList(
changelogRow(
"+I",
1L,
"name 1",
LocalDateTime.parse("2020-03-08T13:12:11.123"),
100L,
41,
"payload 1"),
changelogRow(
"+I",
2L,
"name 2",
LocalDateTime.parse("2020-03-09T13:12:11.123"),
101L,
42,
"payload 2"),
changelogRow(
"+I",
3L,
"name 3",
LocalDateTime.parse("2020-03-10T13:12:11.123"),
102L,
43,
"payload 3"),
changelogRow(
"-U",
2L,
"name 2",
LocalDateTime.parse("2020-03-09T13:12:11.123"),
101L,
42,
"payload 2"),
changelogRow(
"+U",
2L,
"name 2",
LocalDateTime.parse("2020-03-11T13:12:11.123"),
101L,
42,
"payload"));
assertThat(result, deepEqualTo(expected, true));
// ------------- cleanup -------------------
deleteTestTopic(topic);
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)