You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/07 11:46:53 UTC
[flink] 02/02: [FLINK-25510][Connectors / Kafka][tests] Update the validation method and add comments
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e615106b38a289bc624a8554b86c83f9785352d3
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Jan 6 16:31:54 2022 +0800
[FLINK-25510][Connectors / Kafka][tests] Update the validation method and add comments
---
.../source/reader/KafkaPartitionSplitReader.java | 6 ++
.../reader/KafkaPartitionSplitReaderTest.java | 80 +++++-----------------
2 files changed, 23 insertions(+), 63 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index ebadef3..ee7183d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.kafka.source.reader;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -220,6 +221,11 @@ public class KafkaPartitionSplitReader
consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
}
+ @VisibleForTesting
+ KafkaConsumer<byte[], byte[]> consumer() {
+ return consumer;
+ }
+
// --------------- private helper method ----------------------
private void parseStartingOffsets(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index e014d6d..7aecc33 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -51,6 +50,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EmptySource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -260,7 +260,10 @@ public class KafkaPartitionSplitReaderTest {
ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset");
KafkaPartitionSplitReader reader =
createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
- // Add a committed offset split and catch kafka exception
+ // We expect that there is a committed offset, but the group does not actually have a
+ // committed offset, and the offset reset strategy is none (Throw exception to the consumer
+ // if no previous offset is found for the consumer's group);
+ // So it is expected to throw an exception that missing the committed offset.
final KafkaException undefinedOffsetException =
Assertions.assertThrows(
KafkaException.class,
@@ -277,74 +280,25 @@ public class KafkaPartitionSplitReaderTest {
CoreMatchers.containsString("Undefined offset with no reset policy for partition"));
}
- @Test
- public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() throws Throwable {
- MetricListener metricListener = new MetricListener();
+ @ParameterizedTest
+ @CsvSource({"earliest, 0", "latest, 10"})
+ public void testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy(
+ String offsetResetStrategy, Long expectedOffset) {
final Properties props = new Properties();
- props.setProperty(
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- OffsetResetStrategy.EARLIEST.name().toLowerCase());
- props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
- props.setProperty(
- ConsumerConfig.GROUP_ID_CONFIG,
- "using-committed-offset-with-earliest-offset-reset");
+ props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetStrategy);
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset");
KafkaPartitionSplitReader reader =
- createReader(
- props,
- InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
- // Add a committed offset split
+ createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+ // Add committed offset split
+ final TopicPartition partition = new TopicPartition(TOPIC1, 0);
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new KafkaPartitionSplit(
- new TopicPartition(TOPIC1, 0),
- KafkaPartitionSplit.COMMITTED_OFFSET))));
- // pendingRecords should have not been registered because of lazily registration
- assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
- // Trigger first fetch
- reader.fetch();
- final Optional<Gauge<Long>> pendingRecords =
- metricListener.getGauge(MetricNames.PENDING_RECORDS);
- assertTrue(pendingRecords.isPresent());
- // Validate pendingRecords
- assertNotNull(pendingRecords);
- assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) pendingRecords.get().getValue());
- for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
- reader.fetch();
- assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) pendingRecords.get().getValue());
- }
- }
-
- @Test
- public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() throws Throwable {
- final Properties props = new Properties();
- props.setProperty(
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- OffsetResetStrategy.LATEST.name().toLowerCase());
- props.setProperty(
- ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-latest-offset-reset");
- KafkaPartitionSplitReader reader =
- createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
- // Add empty latest offset reset split
- final KafkaPartitionSplit latestOffsetResetEmptySplit =
- new KafkaPartitionSplit(
- new TopicPartition(TOPIC1, 0),
- KafkaPartitionSplit.COMMITTED_OFFSET,
- KafkaPartitionSplit.LATEST_OFFSET);
- final KafkaPartitionSplit latestOffsetResetNormalSplit =
- new KafkaPartitionSplit(
- new TopicPartition(TOPIC2, 0), KafkaPartitionSplit.COMMITTED_OFFSET);
+ partition, KafkaPartitionSplit.COMMITTED_OFFSET))));
- reader.handleSplitsChanges(
- new SplitsAddition<>(
- Arrays.asList(latestOffsetResetEmptySplit, latestOffsetResetNormalSplit)));
-
- // Fetch and check latest offset reset split is added to finished splits
- RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
- assertTrue(
- recordsWithSplitIds
- .finishedSplits()
- .contains(latestOffsetResetEmptySplit.splitId()));
+ // Verify that the current offset of the consumer is the expected offset
+ assertEquals(expectedOffset, reader.consumer().position(partition));
}
// ------------------