You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/15 13:03:12 UTC

[GitHub] [flink] fsk119 opened a new pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

fsk119 opened a new pull request #12908:
URL: https://github.com/apache/flink/pull/12908


   …ry dynamically in table api
   
   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *Enable Kafka Connector topic discovery & partition discovery in table api.*
   
   
   ## Brief change log
   
     - *Expose option 'topic-pattern' and 'scan.topic-partition-discovery.interval'*
     - *Add validation for source when setting 'topic-pattern' and 'topic' together and setting 'topic-pattern' for sink.*
     - *Read value from Table option and use the value to build kafka consumer.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added integration tests for new features*
     - *Added test that validates that setting topic and topic pattern together will fail and setting 'topic-pattern' for sink will fail.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong merged pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
wuchong merged pull request #12908:
URL: https://github.com/apache/flink/pull/12908


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4984",
       "triggerID" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4991",
       "triggerID" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4994",
       "triggerID" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025",
       "triggerID" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205",
       "triggerID" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025) 
   * 4b06551a39ce8d9d6c5f19948e36d493e3d592e3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1a4e5ee2a2f11ba650a98c1a211cea75736fe79d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-668505104


   Btw, could you add an integration test for this? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4984",
       "triggerID" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4991",
       "triggerID" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4994",
       "triggerID" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025",
       "triggerID" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025) 
   * 4b06551a39ce8d9d6c5f19948e36d493e3d592e3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r460723655



##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka010DynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer010<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern pattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -161,11 +176,22 @@ private KafkaOptions() {}
 	// --------------------------------------------------------------------------------------------
 
 	public static void validateTableOptions(ReadableConfig tableOptions) {
+		validateTopic(tableOptions);
 		validateScanStartupMode(tableOptions);
 		validateSinkPartitioner(tableOptions);
 		validateSinkSemantic(tableOptions);
 	}
 
+	public static void validateTopic(ReadableConfig tableOptions) {
+		Optional<String> topic = tableOptions.getOptional(TOPIC);
+		Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
+
+		if ((topic.isPresent() && pattern.isPresent()) || !(topic.isPresent() || pattern.isPresent())) {

Review comment:
       Split this into two exceptions. 

##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka010DynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer010<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern pattern,

Review comment:
       topicPattern

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
##########
@@ -164,34 +178,44 @@ public int hashCode() {
 	/**
 	 * Creates a version-specific Kafka consumer.
 	 *
-	 * @param topic                 Kafka topic to consume.
+	 * @param topics                Kafka topic to consume.
 	 * @param properties            Properties for the Kafka consumer.
 	 * @param deserializationSchema Deserialization schema to use for Kafka records.
 	 * @return The version-specific Kafka consumer
 	 */
 	protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema);
 
+	/**
+	 * Creates a version-specific Kafka consumer.
+	 *
+	 * @param topicPattern          afka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern topicPattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema);

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
##########
@@ -93,16 +101,21 @@
 	 *                               mode is {@link StartupMode#TIMESTAMP}.
 	 */
 	protected KafkaDynamicSourceBase(
-			DataType outputDataType,
-			String topic,
-			Properties properties,
-			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets,
-			long startupTimestampMillis) {
+		DataType outputDataType,
+		@Nullable List<String> topics,
+		@Nullable Pattern topicPattern,
+		Properties properties,
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
+		StartupMode startupMode,
+		Map<KafkaTopicPartition, Long> specificStartupOffsets,
+		long startupTimestampMillis) {

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka011DynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer011<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern topicPattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -70,17 +77,26 @@ public KafkaDynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern topicPattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>.</td>
     </tr>
+    <tr>
+      <td><h5>topic-pattern</h5></td>
+      <td>optional for source(use 'topic' instead if not set)</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic pattern from which the table is read. It will use input value to build regex expression to discover matched topics.</td>

Review comment:
       The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources. 

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -109,10 +133,18 @@ public DynamicTableSink createDynamicTableSink(Context context) {
 		// Validate the option values.
 		validateTableOptions(tableOptions);
 
+		if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()){
+			throw new ValidationException("Flink Kafka sink currently doesn't support 'topic-pattern'.");
+		}
+		String[] topics = tableOptions.get(TOPIC).split(",");
+		if (topics.length > 1) {
+			throw new ValidationException("Flink Kafka sink currently doesn't support topic list.");
+		}

Review comment:
       What about to have a `validateTableSinkOptions` and `validateTableSourceOptions` ? We can then move this validation to `validateSinkTopic()`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -193,6 +219,9 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) {
 									SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
 									SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
 						}
+						if (!tableOptions.getOptional(TOPIC).isPresent() || tableOptions.get(TOPIC).split(",").length > 1){

Review comment:
       Add a util method `boolean isSingleTopic(ReadableConfig)`. It can also be used in sink side. 
   

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>.</td>
     </tr>
+    <tr>
+      <td><h5>topic-pattern</h5></td>
+      <td>optional for source(use 'topic' instead if not set)</td>

Review comment:
       We can simplify this `optional`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -187,6 +197,62 @@ public void testTableSourceCommitOnCheckpointsDisabled() {
 		assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
 	}
 
+	@Test
+	public void testTableSourceWithPattern() {
+		// prepare parameters for Kafka table source
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+			new TestFormatFactory.DecodingFormatMock(",", true);
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"scanTable");
+
+		final Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.remove("topic");
+				options.put("topic-pattern", TOPIC_REGEX);
+				options.put("scan.startup.mode", KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+				options.remove("scan.startup.specific-offsets");
+			});
+		CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
+
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+			objectIdentifier,
+			catalogTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		// Test scan source equals
+		final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource(
+			producedDataType,
+			null,
+			Pattern.compile(TOPIC_REGEX),
+			KAFKA_PROPERTIES,
+			decodingFormat,
+			StartupMode.EARLIEST,
+			specificOffsets,

Review comment:
       Better to use `new HashMap<>()`.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>.</td>

Review comment:
       ```suggestion
         <td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</td>
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list by inputting value like 'topic-1, topic-2'. 

Review comment:
       ```suggestion
   The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list using comma separator like 'topic-1, topic-2'. 
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +82,41 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
 		ReadableConfig tableOptions = helper.getOptions();
-
-		String topic = tableOptions.get(TOPIC);
 		DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
+		Optional<String> topic = tableOptions.getOptional(TOPIC);
+		Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
 		// Validate the option data type.
 		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
 		validateTableOptions(tableOptions);
 
 		DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+
+		final StartupOptions startupOptions = getStartupOptions(tableOptions);
+		final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+		// add topic-partition discovery
+		properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+			String.valueOf(tableOptions
+				.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+				.map(val -> val.toMillis())
+				.orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
+
 		return createKafkaTableSource(
-				producedDataType,
-				topic,
-				getKafkaProperties(context.getCatalogTable().getOptions()),
-				decodingFormat,
-				startupOptions.startupMode,
-				startupOptions.specificOffsets,
-				startupOptions.startupTimestampMillis);
+			producedDataType,
+			topic.map(value ->
+				Arrays
+					.stream(value.split(","))
+					.map(String::trim)
+					.collect(Collectors.toList()))
+				.orElse(null),
+			pattern.map(value -> Pattern.compile(value)).orElse(null),

Review comment:
       I would suggest to add static util methods `List<String> getTopics(ReadableConfig)` and `Pattern getTopicPattern(ReadableConfig)` in `KafkaOptions`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -352,6 +442,49 @@ public void testInvalidSinkSemantic(){
 			new Configuration(),
 			Thread.currentThread().getContextClassLoader());
 	}
+
+	@Test
+	public void testSinkWithTopicListOrTopicPattern(){
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"sinkTable");
+
+		Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.put("topic", TOPICS);
+				options.put("scan.startup.mode", "earliest-offset");
+				options.remove("specific-offsets");
+			});
+		CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("Flink Kafka sink currently doesn't support topic list.")));
+		FactoryUtil.createTableSink(
+			null,
+			objectIdentifier,
+			sinkTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.put("topic-pattern", TOPIC_REGEX);
+			});
+		sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("Flink Kafka sink currently doesn't support 'topic-pattern'.")));

Review comment:
       We shouldn't use `thrown` in one test multiple times, because only the first one will be triggered. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -152,6 +159,13 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.topic-partition-discovery.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(disabled)</td>

Review comment:
       ```suggestion
         <td style="word-wrap: break-word;">(none)</td>
   ```
   
   Currentlly, we don't have `disabled` for `Default` value. We can add more explanation for the default value in Description. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -152,6 +159,13 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.topic-partition-discovery.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(disabled)</td>
+      <td>Duration</td>
+      <td>Optional interval for consumer to discover dynamically created Kafka partitions periodically.</td>

Review comment:
       ```suggestion
         <td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically.</td>
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>

Review comment:
       We can simplify this `required for sink`.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list by inputting value like 'topic-1, topic-2'. 
+The config option `topic-pattern` will use regex regression to discover the matched topic. The config option `scan.topic-partition-discovery.interval` enables Kafka connector to discover dynamically created Kafka partitions.
+
+Please refer to [Kafka documentation]({% link dev/connectors/kafka.md %}#kafka-consumers-topic-and-partition-discovery) for more caveats about delivery guarantees.

Review comment:
       ```suggestion
   Please refer to [Kafka DataStream Connector documentation]({% link dev/connectors/kafka.md %}#kafka-consumers-topic-and-partition-discovery) for more about topic and partition discovery.
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list by inputting value like 'topic-1, topic-2'. 
+The config option `topic-pattern` will use regex regression to discover the matched topic. The config option `scan.topic-partition-discovery.interval` enables Kafka connector to discover dynamically created Kafka partitions.

Review comment:
       ```suggestion
   The config option `topic-pattern`  will use regular expression to discover the matched topic. For example, if the `topic-pattern` is `test-topic-[0-9]`, then all topics with names that match the specified regular expression (starting with `test-topic-` and ending with a single digit)) will be subscribed by the consumer when the job starts running.
   
   To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for `scan.topic-partition-discovery.interval`. This allows the consumer to discover partitions of new topics with names that also match the specified pattern.
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -187,6 +197,62 @@ public void testTableSourceCommitOnCheckpointsDisabled() {
 		assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
 	}
 
+	@Test
+	public void testTableSourceWithPattern() {
+		// prepare parameters for Kafka table source
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+			new TestFormatFactory.DecodingFormatMock(",", true);
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"scanTable");
+
+		final Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.remove("topic");
+				options.put("topic-pattern", TOPIC_REGEX);
+				options.put("scan.startup.mode", KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+				options.remove("scan.startup.specific-offsets");
+			});
+		CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
+
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+			objectIdentifier,
+			catalogTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		// Test scan source equals
+		final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource(
+			producedDataType,
+			null,
+			Pattern.compile(TOPIC_REGEX),
+			KAFKA_PROPERTIES,
+			decodingFormat,
+			StartupMode.EARLIEST,
+			specificOffsets,
+			0);
+		final KafkaDynamicSourceBase actualKafkaSource = (KafkaDynamicSourceBase) actualSource;
+		assertEquals(actualKafkaSource, expectedKafkaSource);
+
+		// Test Kafka consumer
+		ScanTableSource.ScanRuntimeProvider provider =
+			actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+		assertThat(provider, instanceOf(SourceFunctionProvider.class));
+		final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
+		final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
+		assertThat(sourceFunction, instanceOf(getExpectedConsumerClass()));
+		//  Test commitOnCheckpoints flag should be true when set consumer group
+		assertTrue(((FlinkKafkaConsumerBase) sourceFunction).getEnableCommitOnCheckpoints());

Review comment:
       This has been verified in the other test. We don't need to test it again here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1a4e5ee2a2f11ba650a98c1a211cea75736fe79d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4984",
       "triggerID" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4991",
       "triggerID" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4994",
       "triggerID" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025",
       "triggerID" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205",
       "triggerID" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5699",
       "triggerID" : "2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5699) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4984",
       "triggerID" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4991",
       "triggerID" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4994",
       "triggerID" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025",
       "triggerID" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205",
       "triggerID" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b06551a39ce8d9d6c5f19948e36d493e3d592e3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205) 
   * 2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4984",
       "triggerID" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4991",
       "triggerID" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4994",
       "triggerID" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025",
       "triggerID" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205",
       "triggerID" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b06551a39ce8d9d6c5f19948e36d493e3d592e3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r472680410



##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
##########
@@ -54,7 +59,8 @@
 	 */
 	public Kafka010DynamicSource(
 			DataType outputDataType,
-			String topic,
+			@Nullable List<String> topics,
+			@Nullable Pattern topicPattern,

Review comment:
       Currently, it is very verbose to pass through these to parameters together here and there. An improvement is that we can use `KafkaTopicsDescriptor`, but this can be another issue in the future. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r464809980



##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +80,34 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
 		ReadableConfig tableOptions = helper.getOptions();
-
-		String topic = tableOptions.get(TOPIC);
 		DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
 		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
-		validateTableOptions(tableOptions);
+		validateTableSourceOptions(tableOptions);
 
 		DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+
+		final StartupOptions startupOptions = getStartupOptions(tableOptions);
+		final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+		// add topic-partition discovery
+		properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+			String.valueOf(tableOptions
+				.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+				.map(val -> val.toMillis())

Review comment:
       ```suggestion
   				.map(Duration::toMillis)
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -160,12 +178,45 @@ private KafkaOptions() {}
 	// Validation
 	// --------------------------------------------------------------------------------------------
 
-	public static void validateTableOptions(ReadableConfig tableOptions) {
+	public static void validateTableSourceOptions(ReadableConfig tableOptions) {
+		validateSourceTopic(tableOptions);
 		validateScanStartupMode(tableOptions);
+	}
+
+	public static void validateTableSinkOptions(ReadableConfig tableOptions) {
+		validateSinkTopic(tableOptions);
 		validateSinkPartitioner(tableOptions);
 		validateSinkSemantic(tableOptions);
 	}
 
+	public static void validateSourceTopic(ReadableConfig tableOptions) {
+		Optional<String> topic = tableOptions.getOptional(TOPIC);
+		Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
+
+		if (topic.isPresent() && pattern.isPresent()) {
+			throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
+		}
+
+		if (!topic.isPresent() && !pattern.isPresent()) {
+			throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
+		}
+	}
+
+	public static void validateSinkTopic(ReadableConfig tableOptions) {
+		String errorMessageTemp = "Flink Kafka sink currently only supports single topic, but got %s: %s.";
+		if (!isSingleTopic(tableOptions)) {
+			if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
+				throw new ValidationException(String.format(
+					errorMessageTemp, "'topic-pattern'", tableOptions.get(TOPIC_PATTERN)
+				));
+			} else {
+				throw new ValidationException(String.format(
+					errorMessageTemp, "topic-list", tableOptions.get(TOPIC)

Review comment:
       "topic-list" -> "topic"? We don't have "topic-list" option.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +80,34 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
 		ReadableConfig tableOptions = helper.getOptions();
-
-		String topic = tableOptions.get(TOPIC);
 		DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
 		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
-		validateTableOptions(tableOptions);
+		validateTableSourceOptions(tableOptions);
 
 		DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+
+		final StartupOptions startupOptions = getStartupOptions(tableOptions);
+		final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+		// add topic-partition discovery
+		properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,

Review comment:
       ???

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -139,10 +152,12 @@ public void testTableSource() {
 				Thread.currentThread().getContextClassLoader());
 
 		// Test scan source equals
+		KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis", "1000");

Review comment:
       Is it still needed? Because we have set it in static block. 

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -168,10 +185,14 @@ protected abstract KafkaDynamicSinkBase createKafkaTableSink(
 	@Override
 	public Set<ConfigOption<?>> optionalOptions() {
 		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(TOPIC);
+		options.add(TOPIC_PATTERN);
 		options.add(PROPS_GROUP_ID);
 		options.add(SCAN_STARTUP_MODE);
 		options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+		options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
 		options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+		options.add(SCAN_TOPIC_PARTITION_DISCOVERY);

Review comment:
       duplicate

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -239,9 +293,25 @@ public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions){
 		}
 	}
 
-	public static StartupOptions getStartupOptions(
-			ReadableConfig tableOptions,
-			String topic) {
+	public static List<String> getSourceTopics(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(TOPIC).map(value ->
+			Arrays
+				.stream(value.split(","))
+				.map(String::trim)
+				.collect(Collectors.toList()))
+			.orElse(null);
+	}
+
+	public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(TOPIC_PATTERN).map(value -> Pattern.compile(value)).orElse(null);
+	}
+
+	private static boolean isSingleTopic(ReadableConfig tableOptions) {
+		// Option 'topic-pattern' is regarded as multi-topics.
+		return tableOptions.getOptional(TOPIC).isPresent() && tableOptions.get(TOPIC).split(",").length == 1;

Review comment:
       The community recommend to use List ConfigOption for list values, framework will handle the parsing. This will also change to use `;` as the separator, but this is more align with other list options. You can declare a List ConfigOption by :
   
   ```java
   	public static final ConfigOption<List<String>> TOPIC = ConfigOptions
   			.key("topic")
   			.stringType()
   			.asList()
   			.noDefaultValue()
   			.withDescription("...");
   ```
   
   Then you can call `return tableOptions.getOptional(TOPIC).map(t -> t.size() == 1).orElse(false);` here. 
   
   
   Sorry for the late reminder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4537",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4984",
       "triggerID" : "1fc41a223aec43aa63b693df90b803539648c6ed",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4991",
       "triggerID" : "7cfd0a250b0f596cf85d1296c6707fd1956a267f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4994",
       "triggerID" : "a38822387a969f7b8c6149d11b264e0cb34a4f43",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5025",
       "triggerID" : "8d2e2d4bdf6cc90a3fa7b6be4d5ad4bc469feec6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205",
       "triggerID" : "4b06551a39ce8d9d6c5f19948e36d493e3d592e3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5699",
       "triggerID" : "2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b06551a39ce8d9d6c5f19948e36d493e3d592e3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5205) 
   * 2769a8edbd4a4e28ca6b75f39e69d95007b5c4b2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5699) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658771971


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1a4e5ee2a2f11ba650a98c1a211cea75736fe79d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1a4e5ee2a2f11ba650a98c1a211cea75736fe79d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-658758773


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 1a4e5ee2a2f11ba650a98c1a211cea75736fe79d (Wed Jul 15 13:11:58 UTC 2020)
   
    ✅no warnings
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
fsk119 commented on pull request #12908:
URL: https://github.com/apache/flink/pull/12908#issuecomment-659150149


   @wuchong  CC


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fsk119 commented on a change in pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discove…

Posted by GitBox <gi...@apache.org>.
fsk119 commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r462161021



##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -187,6 +197,62 @@ public void testTableSourceCommitOnCheckpointsDisabled() {
 		assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
 	}
 
+	@Test
+	public void testTableSourceWithPattern() {
+		// prepare parameters for Kafka table source
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+			new TestFormatFactory.DecodingFormatMock(",", true);
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"scanTable");
+
+		final Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.remove("topic");
+				options.put("topic-pattern", TOPIC_REGEX);
+				options.put("scan.startup.mode", KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+				options.remove("scan.startup.specific-offsets");
+			});
+		CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
+
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+			objectIdentifier,
+			catalogTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		// Test scan source equals
+		final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource(
+			producedDataType,
+			null,
+			Pattern.compile(TOPIC_REGEX),
+			KAFKA_PROPERTIES,
+			decodingFormat,
+			StartupMode.EARLIEST,
+			specificOffsets,

Review comment:
       emm. I think it's more trivial to use a new hashmap to store the option value. Because we need to add more options such as format.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org