You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/27 09:31:36 UTC

[GitHub] [flink] wuchong commented on a change in pull request #12908: [FLINK-18449][table sql/api]Kafka topic discovery & partition discoveā€¦

wuchong commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r460723655



##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka010DynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer010<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern pattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -161,11 +176,22 @@ private KafkaOptions() {}
 	// --------------------------------------------------------------------------------------------
 
 	public static void validateTableOptions(ReadableConfig tableOptions) {
+		validateTopic(tableOptions);
 		validateScanStartupMode(tableOptions);
 		validateSinkPartitioner(tableOptions);
 		validateSinkSemantic(tableOptions);
 	}
 
+	public static void validateTopic(ReadableConfig tableOptions) {
+		Optional<String> topic = tableOptions.getOptional(TOPIC);
+		Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
+
+		if ((topic.isPresent() && pattern.isPresent()) || !(topic.isPresent() || pattern.isPresent())) {

Review comment:
       Split this into two exceptions. 

##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka010DynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer010<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern pattern,

Review comment:
       topicPattern

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
##########
@@ -164,34 +178,44 @@ public int hashCode() {
 	/**
 	 * Creates a version-specific Kafka consumer.
 	 *
-	 * @param topic                 Kafka topic to consume.
+	 * @param topics                Kafka topic to consume.
 	 * @param properties            Properties for the Kafka consumer.
 	 * @param deserializationSchema Deserialization schema to use for Kafka records.
 	 * @return The version-specific Kafka consumer
 	 */
 	protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema);
 
+	/**
+	 * Creates a version-specific Kafka consumer.
+	 *
+	 * @param topicPattern          afka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern topicPattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema);

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
##########
@@ -93,16 +101,21 @@
 	 *                               mode is {@link StartupMode#TIMESTAMP}.
 	 */
 	protected KafkaDynamicSourceBase(
-			DataType outputDataType,
-			String topic,
-			Properties properties,
-			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
-			StartupMode startupMode,
-			Map<KafkaTopicPartition, Long> specificStartupOffsets,
-			long startupTimestampMillis) {
+		DataType outputDataType,
+		@Nullable List<String> topics,
+		@Nullable Pattern topicPattern,
+		Properties properties,
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
+		StartupMode startupMode,
+		Map<KafkaTopicPartition, Long> specificStartupOffsets,
+		long startupTimestampMillis) {

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSource.java
##########
@@ -73,17 +80,26 @@ public Kafka011DynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer011<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern topicPattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -70,17 +77,26 @@ public KafkaDynamicSource(
 
 	@Override
 	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-			String topic,
+			List<String> topics,
 			Properties properties,
 			DeserializationSchema<RowData> deserializationSchema) {
-		return new FlinkKafkaConsumer<>(topic, deserializationSchema, properties);
+		return new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+		Pattern topicPattern,
+		Properties properties,
+		DeserializationSchema<RowData> deserializationSchema) {

Review comment:
       Indent.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>.</td>
     </tr>
+    <tr>
+      <td><h5>topic-pattern</h5></td>
+      <td>optional for source(use 'topic' instead if not set)</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic pattern from which the table is read. It will use input value to build regex expression to discover matched topics.</td>

Review comment:
       The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources. 

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -109,10 +133,18 @@ public DynamicTableSink createDynamicTableSink(Context context) {
 		// Validate the option values.
 		validateTableOptions(tableOptions);
 
+		if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()){
+			throw new ValidationException("Flink Kafka sink currently doesn't support 'topic-pattern'.");
+		}
+		String[] topics = tableOptions.get(TOPIC).split(",");
+		if (topics.length > 1) {
+			throw new ValidationException("Flink Kafka sink currently doesn't support topic list.");
+		}

Review comment:
       What about to have a `validateTableSinkOptions` and `validateTableSourceOptions` ? We can then move this validation to `validateSinkTopic()`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -193,6 +219,9 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) {
 									SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
 									SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
 						}
+						if (!tableOptions.getOptional(TOPIC).isPresent() || tableOptions.get(TOPIC).split(",").length > 1){

Review comment:
       Add a util method `boolean isSingleTopic(ReadableConfig)`. It can also be used in sink side. 
   

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>.</td>
     </tr>
+    <tr>
+      <td><h5>topic-pattern</h5></td>
+      <td>optional for source(use 'topic' instead if not set)</td>

Review comment:
       We can simplify this `optional`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -187,6 +197,62 @@ public void testTableSourceCommitOnCheckpointsDisabled() {
 		assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
 	}
 
+	@Test
+	public void testTableSourceWithPattern() {
+		// prepare parameters for Kafka table source
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+			new TestFormatFactory.DecodingFormatMock(",", true);
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"scanTable");
+
+		final Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.remove("topic");
+				options.put("topic-pattern", TOPIC_REGEX);
+				options.put("scan.startup.mode", KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+				options.remove("scan.startup.specific-offsets");
+			});
+		CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
+
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+			objectIdentifier,
+			catalogTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		// Test scan source equals
+		final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource(
+			producedDataType,
+			null,
+			Pattern.compile(TOPIC_REGEX),
+			KAFKA_PROPERTIES,
+			decodingFormat,
+			StartupMode.EARLIEST,
+			specificOffsets,

Review comment:
       Better to use `new HashMap<>()`.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name from which the table is read.</td>
+      <td>Topic name(s) from which the table is read. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>.</td>

Review comment:
       ```suggestion
         <td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like <code>'topic-1, topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</td>
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list by inputting value like 'topic-1, topic-2'. 

Review comment:
       ```suggestion
   The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list using comma separator like 'topic-1, topic-2'. 
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +82,41 @@ public DynamicTableSource createDynamicTableSource(Context context) {
 		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
 		ReadableConfig tableOptions = helper.getOptions();
-
-		String topic = tableOptions.get(TOPIC);
 		DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
+		Optional<String> topic = tableOptions.getOptional(TOPIC);
+		Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
 		// Validate the option data type.
 		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
 		validateTableOptions(tableOptions);
 
 		DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+
+		final StartupOptions startupOptions = getStartupOptions(tableOptions);
+		final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+		// add topic-partition discovery
+		properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+			String.valueOf(tableOptions
+				.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+				.map(val -> val.toMillis())
+				.orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
+
 		return createKafkaTableSource(
-				producedDataType,
-				topic,
-				getKafkaProperties(context.getCatalogTable().getOptions()),
-				decodingFormat,
-				startupOptions.startupMode,
-				startupOptions.specificOffsets,
-				startupOptions.startupTimestampMillis);
+			producedDataType,
+			topic.map(value ->
+				Arrays
+					.stream(value.split(","))
+					.map(String::trim)
+					.collect(Collectors.toList()))
+				.orElse(null),
+			pattern.map(value -> Pattern.compile(value)).orElse(null),

Review comment:
       I would suggest to add static util methods `List<String> getTopics(ReadableConfig)` and `Pattern getTopicPattern(ReadableConfig)` in `KafkaOptions`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -352,6 +442,49 @@ public void testInvalidSinkSemantic(){
 			new Configuration(),
 			Thread.currentThread().getContextClassLoader());
 	}
+
+	@Test
+	public void testSinkWithTopicListOrTopicPattern(){
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"sinkTable");
+
+		Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.put("topic", TOPICS);
+				options.put("scan.startup.mode", "earliest-offset");
+				options.remove("specific-offsets");
+			});
+		CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("Flink Kafka sink currently doesn't support topic list.")));
+		FactoryUtil.createTableSink(
+			null,
+			objectIdentifier,
+			sinkTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.put("topic-pattern", TOPIC_REGEX);
+			});
+		sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("Flink Kafka sink currently doesn't support 'topic-pattern'.")));

Review comment:
       We shouldn't use `thrown` in one test multiple times, because only the first one will be triggered. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -152,6 +159,13 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.topic-partition-discovery.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(disabled)</td>

Review comment:
       ```suggestion
         <td style="word-wrap: break-word;">(none)</td>
   ```
   
   Currentlly, we don't have `disabled` for `Default` value. We can add more explanation for the default value in Description. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -152,6 +159,13 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.topic-partition-discovery.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(disabled)</td>
+      <td>Duration</td>
+      <td>Optional interval for consumer to discover dynamically created Kafka partitions periodically.</td>

Review comment:
       ```suggestion
         <td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically.</td>
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -100,11 +100,18 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required</td>
+      <td>required for sink, optional for source(use 'topic-pattern' instead if not set)</td>

Review comment:
       We can simplify this `required for sink`.

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list by inputting value like 'topic-1, topic-2'. 
+The config option `topic-pattern` will use regex regression to discover the matched topic. The config option `scan.topic-partition-discovery.interval` enables Kafka connector to discover dynamically created Kafka partitions.
+
+Please refer to [Kafka documentation]({% link dev/connectors/kafka.md %}#kafka-consumers-topic-and-partition-discovery) for more caveats about delivery guarantees.

Review comment:
       ```suggestion
   Please refer to [Kafka DataStream Connector documentation]({% link dev/connectors/kafka.md %}#kafka-consumers-topic-and-partition-discovery) for more about topic and partition discovery.
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -177,6 +191,14 @@ Connector Options
 
 Features
 ----------------
+### Topic and Partition Discovery
+
+The config option `topic` and `topic-pattern` specifies the topics or topic pattern to consume for source. The config option `topic` can accept topic list by inputting value like 'topic-1, topic-2'. 
+The config option `topic-pattern` will use regex regression to discover the matched topic. The config option `scan.topic-partition-discovery.interval` enables Kafka connector to discover dynamically created Kafka partitions.

Review comment:
       ```suggestion
   The config option `topic-pattern`  will use regular expression to discover the matched topic. For example, if the `topic-pattern` is `test-topic-[0-9]`, then all topics with names that match the specified regular expression (starting with `test-topic-` and ending with a single digit)) will be subscribed by the consumer when the job starts running.
   
   To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for `scan.topic-partition-discovery.interval`. This allows the consumer to discover partitions of new topics with names that also match the specified pattern.
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -187,6 +197,62 @@ public void testTableSourceCommitOnCheckpointsDisabled() {
 		assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
 	}
 
+	@Test
+	public void testTableSourceWithPattern() {
+		// prepare parameters for Kafka table source
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+			new TestFormatFactory.DecodingFormatMock(",", true);
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"scanTable");
+
+		final Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.remove("topic");
+				options.put("topic-pattern", TOPIC_REGEX);
+				options.put("scan.startup.mode", KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+				options.remove("scan.startup.specific-offsets");
+			});
+		CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
+
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+			objectIdentifier,
+			catalogTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		// Test scan source equals
+		final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource(
+			producedDataType,
+			null,
+			Pattern.compile(TOPIC_REGEX),
+			KAFKA_PROPERTIES,
+			decodingFormat,
+			StartupMode.EARLIEST,
+			specificOffsets,
+			0);
+		final KafkaDynamicSourceBase actualKafkaSource = (KafkaDynamicSourceBase) actualSource;
+		assertEquals(actualKafkaSource, expectedKafkaSource);
+
+		// Test Kafka consumer
+		ScanTableSource.ScanRuntimeProvider provider =
+			actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+		assertThat(provider, instanceOf(SourceFunctionProvider.class));
+		final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
+		final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
+		assertThat(sourceFunction, instanceOf(getExpectedConsumerClass()));
+		//  Test commitOnCheckpoints flag should be true when set consumer group
+		assertTrue(((FlinkKafkaConsumerBase) sourceFunction).getEnableCommitOnCheckpoints());

Review comment:
       This has been verified in the other test. We don't need to test it again here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org