You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/07 11:46:52 UTC
[flink] 01/02: [FLINK-25510][Connectors / Kafka][tests] Add using committed offsets test cases for KafkaPartitionSplitReader
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d42462b21b93aa475ce9130ab5d26d57b1902027
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Wed Jan 5 10:13:50 2022 +0800
[FLINK-25510][Connectors / Kafka][tests] Add using committed offsets test cases for KafkaPartitionSplitReader
---
.../reader/KafkaPartitionSplitReaderTest.java | 99 ++++++++++++++++++++++
1 file changed, 99 insertions(+)
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 864d8e9..e014d6d 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -38,11 +38,16 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -248,6 +253,100 @@ public class KafkaPartitionSplitReaderTest {
assertTrue(recordsWithSplitIds.finishedSplits().isEmpty());
}
+ @Test
+ public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() {
+ final Properties props = new Properties();
+ props.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset");
+ KafkaPartitionSplitReader reader =
+ createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+ // Add a committed offset split and catch kafka exception
+ final KafkaException undefinedOffsetException =
+ Assertions.assertThrows(
+ KafkaException.class,
+ () ->
+ reader.handleSplitsChanges(
+ new SplitsAddition<>(
+ Collections.singletonList(
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC1, 0),
+ KafkaPartitionSplit
+ .COMMITTED_OFFSET)))));
+ MatcherAssert.assertThat(
+ undefinedOffsetException.getMessage(),
+ CoreMatchers.containsString("Undefined offset with no reset policy for partition"));
+ }
+
+ @Test
+ public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() throws Throwable {
+ MetricListener metricListener = new MetricListener();
+ final Properties props = new Properties();
+ props.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.EARLIEST.name().toLowerCase());
+ props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
+ props.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "using-committed-offset-with-earliest-offset-reset");
+ KafkaPartitionSplitReader reader =
+ createReader(
+ props,
+ InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
+ // Add a committed offset split
+ reader.handleSplitsChanges(
+ new SplitsAddition<>(
+ Collections.singletonList(
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC1, 0),
+ KafkaPartitionSplit.COMMITTED_OFFSET))));
+ // pendingRecords should have not been registered because of lazily registration
+ assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
+ // Trigger first fetch
+ reader.fetch();
+ final Optional<Gauge<Long>> pendingRecords =
+ metricListener.getGauge(MetricNames.PENDING_RECORDS);
+ assertTrue(pendingRecords.isPresent());
+ // Validate pendingRecords
+ assertNotNull(pendingRecords);
+ assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) pendingRecords.get().getValue());
+ for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
+ reader.fetch();
+ assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) pendingRecords.get().getValue());
+ }
+ }
+
+ @Test
+ public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() throws Throwable {
+ final Properties props = new Properties();
+ props.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.LATEST.name().toLowerCase());
+ props.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-latest-offset-reset");
+ KafkaPartitionSplitReader reader =
+ createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+ // Add empty latest offset reset split
+ final KafkaPartitionSplit latestOffsetResetEmptySplit =
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC1, 0),
+ KafkaPartitionSplit.COMMITTED_OFFSET,
+ KafkaPartitionSplit.LATEST_OFFSET);
+ final KafkaPartitionSplit latestOffsetResetNormalSplit =
+ new KafkaPartitionSplit(
+ new TopicPartition(TOPIC2, 0), KafkaPartitionSplit.COMMITTED_OFFSET);
+
+ reader.handleSplitsChanges(
+ new SplitsAddition<>(
+ Arrays.asList(latestOffsetResetEmptySplit, latestOffsetResetNormalSplit)));
+
+ // Fetch and check latest offset reset split is added to finished splits
+ RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
+ assertTrue(
+ recordsWithSplitIds
+ .finishedSplits()
+ .contains(latestOffsetResetEmptySplit.splitId()));
+ }
+
// ------------------
private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)