You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/26 13:27:52 UTC

[flink] branch release-1.11 updated: [FLINK-17802][kafka] Set offset commit only if group id is configured for new Kafka Table source

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new acf94d7  [FLINK-17802][kafka] Set offset commit only if group id is configured for new Kafka Table source
acf94d7 is described below

commit acf94d7748b08ae56d28214614dec045287e7d26
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue May 26 21:27:27 2020 +0800

    [FLINK-17802][kafka] Set offset commit only if group id is configured for new Kafka Table source
    
    This closes #12254
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |  2 +-
 .../kafka/table/KafkaDynamicSourceBase.java        |  1 +
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java | 31 +++++++++++-----------
 .../table/KafkaDynamicTableFactoryTestBase.java    | 31 ++++++++++++++++++++++
 4 files changed, 48 insertions(+), 17 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 733011f..e8c0ae4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -1119,7 +1119,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@VisibleForTesting
-	boolean getEnableCommitOnCheckpoints() {
+	public boolean getEnableCommitOnCheckpoints() {
 		return enableCommitOnCheckpoints;
 	}
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
index 48f77d9..0c97715 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
@@ -209,6 +209,7 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 				kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
 				break;
 			}
+		kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
 		return kafkaConsumer;
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 218e55d..5e4dc91 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -170,24 +170,23 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 		final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
 		actualKafkaSource.getDataStream(mock);
 		assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+		// Test commitOnCheckpoints flag should be true when set consumer group.
 		assertTrue(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
+	}
 
-		Properties propsWithoutGroupId = new Properties();
-		propsWithoutGroupId.setProperty("bootstrap.servers", "dummy");
-
-		final KafkaTableSourceBase sourceWithoutGroupId = getExpectedKafkaTableSource(
-			schema,
-			Optional.of(PROC_TIME),
-			rowtimeAttributeDescriptors,
-			fieldMapping,
-			TOPIC,
-			propsWithoutGroupId,
-			deserializationSchema,
-			StartupMode.LATEST,
-			new HashMap<>(),
-			0L);
-
-		sourceWithoutGroupId.getDataStream(mock);
+	@Test
+	public void testTableSourceCommitOnCheckpointsDisabled() {
+		Map<String, String> propertiesMap = new HashMap<>();
+		createKafkaSourceProperties().forEach((k, v) -> {
+			if (!k.equals("connector.properties.group.id")) {
+				propertiesMap.put(k, v);
+			}
+		});
+		final TableSource<?> tableSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
+			.createStreamTableSource(propertiesMap);
+		final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
+		// Test commitOnCheckpoints flag should be false when do not set consumer group.
+		((KafkaTableSourceBase) tableSource).getDataStream(mock);
 		assertTrue(mock.sourceFunction instanceof FlinkKafkaConsumerBase);
 		assertFalse(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
 	}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
index 29d11fa..9ea36ff 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -61,7 +62,9 @@ import java.util.function.Consumer;
 import static org.apache.flink.util.CoreMatchers.containsCause;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Abstract test base for {@link KafkaDynamicTableFactoryBase}.
@@ -153,6 +156,34 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 		final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
 		final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
 		assertThat(sourceFunction, instanceOf(getExpectedConsumerClass()));
+		//  Test commitOnCheckpoints flag should be true when set consumer group
+		assertTrue(((FlinkKafkaConsumerBase) sourceFunction).getEnableCommitOnCheckpoints());
+	}
+
+	@Test
+	public void testTableSourceCommitOnCheckpointsDisabled() {
+		//Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"scanTable");
+		Map<String, String> tableOptions = getFullSourceOptions();
+		tableOptions.remove("properties.group.id");
+		CatalogTable catalogTable = createKafkaSourceCatalogTable(tableOptions);
+		final DynamicTableSource tableSource = FactoryUtil.createTableSource(null,
+			objectIdentifier,
+			catalogTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+
+		// Test commitOnCheckpoints flag should be false when do not set consumer group.
+		assertThat(tableSource, instanceOf(KafkaDynamicSourceBase.class));
+		ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = ((KafkaDynamicSourceBase) tableSource)
+			.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+		assertThat(providerWithoutGroupId, instanceOf(SourceFunctionProvider.class));
+		final SourceFunctionProvider functionProviderWithoutGroupId = (SourceFunctionProvider) providerWithoutGroupId;
+		final SourceFunction<RowData> function = functionProviderWithoutGroupId.createSourceFunction();
+		assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
 	}
 
 	@Test