You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/07 11:46:52 UTC

[flink] 01/02: [FLINK-25510][Connectors / Kafka][tests] Add using committed offsets test cases for KafkaPartitionSplitReader

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d42462b21b93aa475ce9130ab5d26d57b1902027
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Wed Jan 5 10:13:50 2022 +0800

    [FLINK-25510][Connectors / Kafka][tests] Add using committed offsets test cases for KafkaPartitionSplitReader
---
 .../reader/KafkaPartitionSplitReaderTest.java      | 99 ++++++++++++++++++++++
 1 file changed, 99 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 864d8e9..e014d6d 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -38,11 +38,16 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -248,6 +253,100 @@ public class KafkaPartitionSplitReaderTest {
         assertTrue(recordsWithSplitIds.finishedSplits().isEmpty());
     }
 
+    @Test
+    public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add a committed offset split and catch kafka exception
+        final KafkaException undefinedOffsetException =
+                Assertions.assertThrows(
+                        KafkaException.class,
+                        () ->
+                                reader.handleSplitsChanges(
+                                        new SplitsAddition<>(
+                                                Collections.singletonList(
+                                                        new KafkaPartitionSplit(
+                                                                new TopicPartition(TOPIC1, 0),
+                                                                KafkaPartitionSplit
+                                                                        .COMMITTED_OFFSET)))));
+        MatcherAssert.assertThat(
+                undefinedOffsetException.getMessage(),
+                CoreMatchers.containsString("Undefined offset with no reset policy for partition"));
+    }
+
+    @Test
+    public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() throws Throwable {
+        MetricListener metricListener = new MetricListener();
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.EARLIEST.name().toLowerCase());
+        props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG,
+                "using-committed-offset-with-earliest-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(
+                        props,
+                        InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
+        // Add a committed offset split
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Collections.singletonList(
+                                new KafkaPartitionSplit(
+                                        new TopicPartition(TOPIC1, 0),
+                                        KafkaPartitionSplit.COMMITTED_OFFSET))));
+        // pendingRecords should have not been registered because of lazily registration
+        assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
+        // Trigger first fetch
+        reader.fetch();
+        final Optional<Gauge<Long>> pendingRecords =
+                metricListener.getGauge(MetricNames.PENDING_RECORDS);
+        assertTrue(pendingRecords.isPresent());
+        // Validate pendingRecords
+        assertNotNull(pendingRecords);
+        assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) pendingRecords.get().getValue());
+        for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
+            reader.fetch();
+            assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) pendingRecords.get().getValue());
+        }
+    }
+
+    @Test
+    public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() throws Throwable {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.LATEST.name().toLowerCase());
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-latest-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add empty latest offset reset split
+        final KafkaPartitionSplit latestOffsetResetEmptySplit =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC1, 0),
+                        KafkaPartitionSplit.COMMITTED_OFFSET,
+                        KafkaPartitionSplit.LATEST_OFFSET);
+        final KafkaPartitionSplit latestOffsetResetNormalSplit =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC2, 0), KafkaPartitionSplit.COMMITTED_OFFSET);
+
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Arrays.asList(latestOffsetResetEmptySplit, latestOffsetResetNormalSplit)));
+
+        // Fetch and check latest offset reset split is added to finished splits
+        RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
+        assertTrue(
+                recordsWithSplitIds
+                        .finishedSplits()
+                        .contains(latestOffsetResetEmptySplit.splitId()));
+    }
+
     // ------------------
 
     private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)