You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/26 13:27:52 UTC
[flink] branch release-1.11 updated: [FLINK-17802][kafka] Set
offset commit only if group id is configured for new Kafka Table source
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new acf94d7 [FLINK-17802][kafka] Set offset commit only if group id is configured for new Kafka Table source
acf94d7 is described below
commit acf94d7748b08ae56d28214614dec045287e7d26
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue May 26 21:27:27 2020 +0800
[FLINK-17802][kafka] Set offset commit only if group id is configured for new Kafka Table source
This closes #12254
---
.../connectors/kafka/FlinkKafkaConsumerBase.java | 2 +-
.../kafka/table/KafkaDynamicSourceBase.java | 1 +
.../kafka/KafkaTableSourceSinkFactoryTestBase.java | 31 +++++++++++-----------
.../table/KafkaDynamicTableFactoryTestBase.java | 31 ++++++++++++++++++++++
4 files changed, 48 insertions(+), 17 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 733011f..e8c0ae4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -1119,7 +1119,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
@VisibleForTesting
- boolean getEnableCommitOnCheckpoints() {
+ public boolean getEnableCommitOnCheckpoints() {
return enableCommitOnCheckpoints;
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
index 48f77d9..0c97715 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
@@ -209,6 +209,7 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
break;
}
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
return kafkaConsumer;
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 218e55d..5e4dc91 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -170,24 +170,23 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
actualKafkaSource.getDataStream(mock);
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+ // Test commitOnCheckpoints flag should be true when set consumer group.
assertTrue(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
+ }
- Properties propsWithoutGroupId = new Properties();
- propsWithoutGroupId.setProperty("bootstrap.servers", "dummy");
-
- final KafkaTableSourceBase sourceWithoutGroupId = getExpectedKafkaTableSource(
- schema,
- Optional.of(PROC_TIME),
- rowtimeAttributeDescriptors,
- fieldMapping,
- TOPIC,
- propsWithoutGroupId,
- deserializationSchema,
- StartupMode.LATEST,
- new HashMap<>(),
- 0L);
-
- sourceWithoutGroupId.getDataStream(mock);
+ @Test
+ public void testTableSourceCommitOnCheckpointsDisabled() {
+ Map<String, String> propertiesMap = new HashMap<>();
+ createKafkaSourceProperties().forEach((k, v) -> {
+ if (!k.equals("connector.properties.group.id")) {
+ propertiesMap.put(k, v);
+ }
+ });
+ final TableSource<?> tableSource = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
+ .createStreamTableSource(propertiesMap);
+ final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
+ // Test commitOnCheckpoints flag should be false when do not set consumer group.
+ ((KafkaTableSourceBase) tableSource).getDataStream(mock);
assertTrue(mock.sourceFunction instanceof FlinkKafkaConsumerBase);
assertFalse(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
index 29d11fa..9ea36ff 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -61,7 +62,9 @@ import java.util.function.Consumer;
import static org.apache.flink.util.CoreMatchers.containsCause;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* Abstract test base for {@link KafkaDynamicTableFactoryBase}.
@@ -153,6 +156,34 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
assertThat(sourceFunction, instanceOf(getExpectedConsumerClass()));
+ // Test commitOnCheckpoints flag should be true when set consumer group
+ assertTrue(((FlinkKafkaConsumerBase) sourceFunction).getEnableCommitOnCheckpoints());
+ }
+
+ @Test
+ public void testTableSourceCommitOnCheckpointsDisabled() {
+ //Construct table source using options and table source factory
+ ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+ "default",
+ "default",
+ "scanTable");
+ Map<String, String> tableOptions = getFullSourceOptions();
+ tableOptions.remove("properties.group.id");
+ CatalogTable catalogTable = createKafkaSourceCatalogTable(tableOptions);
+ final DynamicTableSource tableSource = FactoryUtil.createTableSource(null,
+ objectIdentifier,
+ catalogTable,
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader());
+
+ // Test commitOnCheckpoints flag should be false when do not set consumer group.
+ assertThat(tableSource, instanceOf(KafkaDynamicSourceBase.class));
+ ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = ((KafkaDynamicSourceBase) tableSource)
+ .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertThat(providerWithoutGroupId, instanceOf(SourceFunctionProvider.class));
+ final SourceFunctionProvider functionProviderWithoutGroupId = (SourceFunctionProvider) providerWithoutGroupId;
+ final SourceFunction<RowData> function = functionProviderWithoutGroupId.createSourceFunction();
+ assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints());
}
@Test