You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/07 11:46:53 UTC

[flink] 02/02: [FLINK-25510][Connectors / Kafka][tests] Update the validation method and add comments

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e615106b38a289bc624a8554b86c83f9785352d3
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Jan 6 16:31:54 2022 +0800

    [FLINK-25510][Connectors / Kafka][tests] Update the validation method and add comments
---
 .../source/reader/KafkaPartitionSplitReader.java   |  6 ++
 .../reader/KafkaPartitionSplitReaderTest.java      | 80 +++++-----------------
 2 files changed, 23 insertions(+), 63 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index ebadef3..ee7183d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kafka.source.reader;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -220,6 +221,11 @@ public class KafkaPartitionSplitReader
         consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
     }
 
+    @VisibleForTesting
+    KafkaConsumer<byte[], byte[]> consumer() {
+        return consumer;
+    }
+
     // --------------- private helper method ----------------------
 
     private void parseStartingOffsets(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index e014d6d..7aecc33 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -51,6 +50,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.EmptySource;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -260,7 +260,10 @@ public class KafkaPartitionSplitReaderTest {
                 ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset");
         KafkaPartitionSplitReader reader =
                 createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
-        // Add a committed offset split and catch kafka exception
+        // We expect that there is a committed offset, but the group does not actually have a
+        // committed offset, and the offset reset strategy is none (Throw exception to the consumer
+        // if no previous offset is found for the consumer's group);
+        // So it is expected to throw an exception that missing the committed offset.
         final KafkaException undefinedOffsetException =
                 Assertions.assertThrows(
                         KafkaException.class,
@@ -277,74 +280,25 @@ public class KafkaPartitionSplitReaderTest {
                 CoreMatchers.containsString("Undefined offset with no reset policy for partition"));
     }
 
-    @Test
-    public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() throws Throwable {
-        MetricListener metricListener = new MetricListener();
+    @ParameterizedTest
+    @CsvSource({"earliest, 0", "latest, 10"})
+    public void testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy(
+            String offsetResetStrategy, Long expectedOffset) {
         final Properties props = new Properties();
-        props.setProperty(
-                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-                OffsetResetStrategy.EARLIEST.name().toLowerCase());
-        props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
-        props.setProperty(
-                ConsumerConfig.GROUP_ID_CONFIG,
-                "using-committed-offset-with-earliest-offset-reset");
+        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetStrategy);
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset");
         KafkaPartitionSplitReader reader =
-                createReader(
-                        props,
-                        InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
-        // Add a committed offset split
+                createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add committed offset split
+        final TopicPartition partition = new TopicPartition(TOPIC1, 0);
         reader.handleSplitsChanges(
                 new SplitsAddition<>(
                         Collections.singletonList(
                                 new KafkaPartitionSplit(
-                                        new TopicPartition(TOPIC1, 0),
-                                        KafkaPartitionSplit.COMMITTED_OFFSET))));
-        // pendingRecords should have not been registered because of lazily registration
-        assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
-        // Trigger first fetch
-        reader.fetch();
-        final Optional<Gauge<Long>> pendingRecords =
-                metricListener.getGauge(MetricNames.PENDING_RECORDS);
-        assertTrue(pendingRecords.isPresent());
-        // Validate pendingRecords
-        assertNotNull(pendingRecords);
-        assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) pendingRecords.get().getValue());
-        for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
-            reader.fetch();
-            assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) pendingRecords.get().getValue());
-        }
-    }
-
-    @Test
-    public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() throws Throwable {
-        final Properties props = new Properties();
-        props.setProperty(
-                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-                OffsetResetStrategy.LATEST.name().toLowerCase());
-        props.setProperty(
-                ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-latest-offset-reset");
-        KafkaPartitionSplitReader reader =
-                createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
-        // Add empty latest offset reset split
-        final KafkaPartitionSplit latestOffsetResetEmptySplit =
-                new KafkaPartitionSplit(
-                        new TopicPartition(TOPIC1, 0),
-                        KafkaPartitionSplit.COMMITTED_OFFSET,
-                        KafkaPartitionSplit.LATEST_OFFSET);
-        final KafkaPartitionSplit latestOffsetResetNormalSplit =
-                new KafkaPartitionSplit(
-                        new TopicPartition(TOPIC2, 0), KafkaPartitionSplit.COMMITTED_OFFSET);
+                                        partition, KafkaPartitionSplit.COMMITTED_OFFSET))));
 
-        reader.handleSplitsChanges(
-                new SplitsAddition<>(
-                        Arrays.asList(latestOffsetResetEmptySplit, latestOffsetResetNormalSplit)));
-
-        // Fetch and check latest offset reset split is added to finished splits
-        RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
-        assertTrue(
-                recordsWithSplitIds
-                        .finishedSplits()
-                        .contains(latestOffsetResetEmptySplit.splitId()));
+        // Verify that the current offset of the consumer is the expected offset
+        assertEquals(expectedOffset, reader.consumer().position(partition));
     }
 
     // ------------------