You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/04 06:42:21 UTC

[GitHub] [flink] wuchong commented on a change in pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discoveā€¦

wuchong commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r464809980



##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +80,34 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
 		ReadableConfig tableOptions = helper.getOptions();
-
-		String topic = tableOptions.get(TOPIC);
 		DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
 		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
-		validateTableOptions(tableOptions);
+		validateTableSourceOptions(tableOptions);
 
 		DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+
+		final StartupOptions startupOptions = getStartupOptions(tableOptions);
+		final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+		// add topic-partition discovery
+		properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+			String.valueOf(tableOptions
+				.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+				.map(val -> val.toMillis())

Review comment:
       ```suggestion
   				.map(Duration::toMillis)
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -160,12 +178,45 @@ private KafkaOptions() {}
 	// Validation
 	// --------------------------------------------------------------------------------------------
 
-	public static void validateTableOptions(ReadableConfig tableOptions) {
+	public static void validateTableSourceOptions(ReadableConfig tableOptions) {
+		validateSourceTopic(tableOptions);
 		validateScanStartupMode(tableOptions);
+	}
+
+	public static void validateTableSinkOptions(ReadableConfig tableOptions) {
+		validateSinkTopic(tableOptions);
 		validateSinkPartitioner(tableOptions);
 		validateSinkSemantic(tableOptions);
 	}
 
+	public static void validateSourceTopic(ReadableConfig tableOptions) {
+		Optional<String> topic = tableOptions.getOptional(TOPIC);
+		Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
+
+		if (topic.isPresent() && pattern.isPresent()) {
+			throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
+		}
+
+		if (!topic.isPresent() && !pattern.isPresent()) {
+			throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
+		}
+	}
+
+	public static void validateSinkTopic(ReadableConfig tableOptions) {
+		String errorMessageTemp = "Flink Kafka sink currently only supports single topic, but got %s: %s.";
+		if (!isSingleTopic(tableOptions)) {
+			if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
+				throw new ValidationException(String.format(
+					errorMessageTemp, "'topic-pattern'", tableOptions.get(TOPIC_PATTERN)
+				));
+			} else {
+				throw new ValidationException(String.format(
+					errorMessageTemp, "topic-list", tableOptions.get(TOPIC)

Review comment:
       "topic-list" -> "topic"? We don't have "topic-list" option.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +80,34 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
 		ReadableConfig tableOptions = helper.getOptions();
-
-		String topic = tableOptions.get(TOPIC);
 		DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
 		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
-		validateTableOptions(tableOptions);
+		validateTableSourceOptions(tableOptions);
 
 		DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+
+		final StartupOptions startupOptions = getStartupOptions(tableOptions);
+		final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+		// add topic-partition discovery
+		properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,

Review comment:
       ???

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -139,10 +152,12 @@ public void testTableSource() {
 				Thread.currentThread().getContextClassLoader());
 
 		// Test scan source equals
+		KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis", "1000");

Review comment:
       Is it still needed? Because we have set it in static block. 

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -168,10 +185,14 @@ protected abstract KafkaDynamicSinkBase createKafkaTableSink(
 	@Override
 	public Set<ConfigOption<?>> optionalOptions() {
 		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(TOPIC);
+		options.add(TOPIC_PATTERN);
 		options.add(PROPS_GROUP_ID);
 		options.add(SCAN_STARTUP_MODE);
 		options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+		options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
 		options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+		options.add(SCAN_TOPIC_PARTITION_DISCOVERY);

Review comment:
       duplicate

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -239,9 +293,25 @@ public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions){
 		}
 	}
 
-	public static StartupOptions getStartupOptions(
-			ReadableConfig tableOptions,
-			String topic) {
+	public static List<String> getSourceTopics(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(TOPIC).map(value ->
+			Arrays
+				.stream(value.split(","))
+				.map(String::trim)
+				.collect(Collectors.toList()))
+			.orElse(null);
+	}
+
+	public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(TOPIC_PATTERN).map(value -> Pattern.compile(value)).orElse(null);
+	}
+
+	private static boolean isSingleTopic(ReadableConfig tableOptions) {
+		// Option 'topic-pattern' is regarded as multi-topics.
+		return tableOptions.getOptional(TOPIC).isPresent() && tableOptions.get(TOPIC).split(",").length == 1;

Review comment:
       The community recommend to use List ConfigOption for list values, framework will handle the parsing. This will also change to use `;` as the separator, but this is more align with other list options. You can declare a List ConfigOption by :
   
   ```java
   	public static final ConfigOption<List<String>> TOPIC = ConfigOptions
   			.key("topic")
   			.stringType()
   			.asList()
   			.noDefaultValue()
   			.withDescription("...");
   ```
   
   Then you can call `return tableOptions.getOptional(TOPIC).map(t -> t.size() == 1).orElse(false);` here. 
   
   
   Sorry for the late reminder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org