You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/06 07:43:18 UTC

[GitHub] [flink] ashulin commented on a change in pull request #18266: [FLINK-25510][Connectors / Kafka][tests] Add using committed offsets test cases for KafkaPartitionSplitReader

ashulin commented on a change in pull request #18266:
URL: https://github.com/apache/flink/pull/18266#discussion_r779354166



##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
##########
@@ -248,6 +253,100 @@ public void testAssignEmptySplit() throws Exception {
         assertTrue(recordsWithSplitIds.finishedSplits().isEmpty());
     }
 
+    @Test
+    public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add a committed offset split and catch kafka exception
+        final KafkaException undefinedOffsetException =
+                Assertions.assertThrows(
+                        KafkaException.class,
+                        () ->
+                                reader.handleSplitsChanges(
+                                        new SplitsAddition<>(
+                                                Collections.singletonList(
+                                                        new KafkaPartitionSplit(
+                                                                new TopicPartition(TOPIC1, 0),
+                                                                KafkaPartitionSplit
+                                                                        .COMMITTED_OFFSET)))));
+        MatcherAssert.assertThat(
+                undefinedOffsetException.getMessage(),
+                CoreMatchers.containsString("Undefined offset with no reset policy for partition"));
+    }
+
+    @Test
+    public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() throws Throwable {
+        MetricListener metricListener = new MetricListener();
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.EARLIEST.name().toLowerCase());
+        props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG,
+                "using-committed-offset-with-earliest-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(
+                        props,
+                        InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
+        // Add a committed offset split
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Collections.singletonList(
+                                new KafkaPartitionSplit(
+                                        new TopicPartition(TOPIC1, 0),
+                                        KafkaPartitionSplit.COMMITTED_OFFSET))));
+        // pendingRecords should have not been registered because of lazily registration
+        assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
+        // Trigger first fetch
+        reader.fetch();
+        final Optional<Gauge<Long>> pendingRecords =
+                metricListener.getGauge(MetricNames.PENDING_RECORDS);
+        assertTrue(pendingRecords.isPresent());
+        // Validate pendingRecords
+        assertNotNull(pendingRecords);
+        assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) pendingRecords.get().getValue());
+        for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
+            reader.fetch();
+            assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) pendingRecords.get().getValue());
+        }
+    }
+
+    @Test
+    public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() throws Throwable {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.LATEST.name().toLowerCase());
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-latest-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add empty latest offset reset split
+        final KafkaPartitionSplit latestOffsetResetEmptySplit =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC1, 0),
+                        KafkaPartitionSplit.COMMITTED_OFFSET,
+                        KafkaPartitionSplit.LATEST_OFFSET);
+        final KafkaPartitionSplit latestOffsetResetNormalSplit =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC2, 0), KafkaPartitionSplit.COMMITTED_OFFSET);
+
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Arrays.asList(latestOffsetResetEmptySplit, latestOffsetResetNormalSplit)));
+
+        // Fetch and check latest offset reset split is added to finished splits
+        RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
+        assertTrue(
+                recordsWithSplitIds
+                        .finishedSplits()
+                        .contains(latestOffsetResetEmptySplit.splitId()));

Review comment:
       `latestOffsetResetEmptySplit` will be added to the Finished split, but if there is no unfinished split `consumer.poll()` in the `reader.fetch()`  will throw an exception.
   I also didn’t find out how to get the finished split without using `reader.fetch()` , so the test is done like this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org