You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2020/05/19 14:34:50 UTC
[flink] branch release-1.11 updated: [FLINK-17619] Disable commit
on checkpoints if no group.id was specified for Kafka table source
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new cf29c2c [FLINK-17619] Disable commit on checkpoints if no group.id was specified for Kafka table source
cf29c2c is described below
commit cf29c2c9c21eba72535a5eb86d6f28de2803c0ee
Author: Gyula Fora <gy...@cloudera.com>
AuthorDate: Wed May 13 11:23:50 2020 +0200
[FLINK-17619] Disable commit on checkpoints if no group.id was specified for Kafka table source
Closes #12250
---
.../connectors/kafka/FlinkKafkaConsumerBase.java | 5 +++++
.../connectors/kafka/KafkaTableSourceBase.java | 1 +
.../kafka/KafkaTableSourceSinkFactoryTestBase.java | 21 +++++++++++++++++++++
3 files changed, 27 insertions(+)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index f9f835a..84057b0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -1114,6 +1114,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
return pendingOffsetsToCommit;
}
+ @VisibleForTesting
+ boolean getEnableCommitOnCheckpoints() {
+ return enableCommitOnCheckpoints;
+ }
+
/**
* Creates state serializer for kafka topic partition to offset tuple.
* Using of the explicit state serializer with KryoSerializer is needed because otherwise
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
index d6195e6..ef4051a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
@@ -291,6 +291,7 @@ public abstract class KafkaTableSourceBase implements
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
break;
}
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
return kafkaConsumer;
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index d8eb011..218e55d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -67,6 +67,7 @@ import java.util.Optional;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@@ -169,6 +170,26 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
actualKafkaSource.getDataStream(mock);
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+ assertTrue(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
+
+ Properties propsWithoutGroupId = new Properties();
+ propsWithoutGroupId.setProperty("bootstrap.servers", "dummy");
+
+ final KafkaTableSourceBase sourceWithoutGroupId = getExpectedKafkaTableSource(
+ schema,
+ Optional.of(PROC_TIME),
+ rowtimeAttributeDescriptors,
+ fieldMapping,
+ TOPIC,
+ propsWithoutGroupId,
+ deserializationSchema,
+ StartupMode.LATEST,
+ new HashMap<>(),
+ 0L);
+
+ sourceWithoutGroupId.getDataStream(mock);
+ assertTrue(mock.sourceFunction instanceof FlinkKafkaConsumerBase);
+ assertFalse(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
}
@Test