You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2020/05/19 14:34:50 UTC

[flink] branch release-1.11 updated: [FLINK-17619] Disable commit on checkpoints if no group.id was specified for Kafka table source

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new cf29c2c  [FLINK-17619] Disable commit on checkpoints if no group.id was specified for Kafka table source
cf29c2c is described below

commit cf29c2c9c21eba72535a5eb86d6f28de2803c0ee
Author: Gyula Fora <gy...@cloudera.com>
AuthorDate: Wed May 13 11:23:50 2020 +0200

    [FLINK-17619] Disable commit on checkpoints if no group.id was specified for Kafka table source
    
    Closes #12250
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java    |  5 +++++
 .../connectors/kafka/KafkaTableSourceBase.java      |  1 +
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java  | 21 +++++++++++++++++++++
 3 files changed, 27 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index f9f835a..84057b0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -1114,6 +1114,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		return pendingOffsetsToCommit;
 	}
 
+	@VisibleForTesting
+	boolean getEnableCommitOnCheckpoints() {
+		return enableCommitOnCheckpoints;
+	}
+
 	/**
 	 * Creates state serializer for kafka topic partition to offset tuple.
 	 * Using of the explicit state serializer with KryoSerializer is needed because otherwise
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
index d6195e6..ef4051a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
@@ -291,6 +291,7 @@ public abstract class KafkaTableSourceBase implements
 				kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
 				break;
 		}
+		kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
 		return kafkaConsumer;
 	}
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index d8eb011..218e55d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -67,6 +67,7 @@ import java.util.Optional;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -169,6 +170,26 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 		final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock();
 		actualKafkaSource.getDataStream(mock);
 		assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+		assertTrue(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
+
+		Properties propsWithoutGroupId = new Properties();
+		propsWithoutGroupId.setProperty("bootstrap.servers", "dummy");
+
+		final KafkaTableSourceBase sourceWithoutGroupId = getExpectedKafkaTableSource(
+			schema,
+			Optional.of(PROC_TIME),
+			rowtimeAttributeDescriptors,
+			fieldMapping,
+			TOPIC,
+			propsWithoutGroupId,
+			deserializationSchema,
+			StartupMode.LATEST,
+			new HashMap<>(),
+			0L);
+
+		sourceWithoutGroupId.getDataStream(mock);
+		assertTrue(mock.sourceFunction instanceof FlinkKafkaConsumerBase);
+		assertFalse(((FlinkKafkaConsumerBase) mock.sourceFunction).getEnableCommitOnCheckpoints());
 	}
 
 	@Test