You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/06/18 03:41:03 UTC

[kafka] branch 2.5 updated: KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (#8841)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 8202acc  KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (#8841)
8202acc is described below

commit 8202acc0e8c0951bb567264bbc89bc2a4e601435
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Jun 17 23:23:45 2020 -0400

    KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (#8841)
    
    When a partition subscription is initialized it has a `null` position and is in the INITIALIZING state. Depending on the consumer, it will then transition to one of the other states. Typically a consumer will either reset the offset to earliest/latest, or it will provide an offset (with or without offset metadata). For the reset case, we still have no position to act on so fetches should not occur.
    
    Recently we made changes for KAFKA-9724 (#8376) to prevent clients from entering the AWAIT_VALIDATION state when targeting older brokers. New logic to bypass offset validation as part of this change exposed this new issue.
    
    In the partition subscriptions, the AWAIT_RESET state was incorrectly reporting that it had a position. In some cases a position might actually exist (e.g., if we were resetting offsets during a fetch after a truncation), but in the initialization case no position had been set. We saw this issue in system tests where there is a race between the offset reset completing and the first fetch request being issued.
    
    Since AWAIT_RESET#hasPosition was incorrectly returning `true`, the new logic to bypass offset validation was transitioning the subscription to FETCHING (even though no position existed).
    
    The fix was simply to have AWAIT_RESET#hasPosition to return `false` which should have been the case from the start. Additionally, this fix includes some guards against NPE when reading the position from the subscription.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../consumer/internals/ConsumerCoordinator.java    |  4 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 42 ++++++++++------
 .../consumer/internals/SubscriptionState.java      | 57 +++++++++++++++++-----
 .../internals/ConsumerCoordinatorTest.java         | 20 ++++----
 .../clients/consumer/internals/FetcherTest.java    | 31 +++++++++++-
 .../consumer/internals/SubscriptionStateTest.java  | 54 +++++++++++++++++++-
 7 files changed, 167 insertions(+), 43 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 20da83f..145dd05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2389,7 +2389,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         // If there are partitions still needing a position and a reset policy is defined,
         // request reset using the default policy. If no reset strategy is defined and there
         // are partitions with a missing position, then we will raise an exception.
-        subscriptions.resetMissingPositions();
+        subscriptions.resetInitializingPositions();
 
         // Finally send an asynchronous request to lookup and update the positions of any
         // partitions which are awaiting reset.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index d4a399f..2978611 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -767,9 +767,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @return true iff the operation completed within the timeout
      */
     public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
-        final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
+        final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions();
 
-        final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timer);
+        final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(initializingPartitions, timer);
         if (offsets == null) return false;
 
         for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ea1b7e7..4bfa627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
@@ -205,7 +206,7 @@ public class Fetcher<K, V> implements Closeable {
     /**
      * Represents data about an offset returned by a broker.
      */
-    private static class ListOffsetData {
+    static class ListOffsetData {
         final long offset;
         final Long timestamp; //  null if the broker does not support returning timestamps
         final Optional<Integer> leaderEpoch; // empty if the leader epoch is not known
@@ -490,9 +491,10 @@ public class Fetcher<K, V> implements Closeable {
         });
 
         // Collect positions needing validation, with backoff
-        Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = subscriptions
+        Map<TopicPartition, FetchPosition> partitionsToValidate = subscriptions
                 .partitionsNeedingValidation(time.milliseconds())
                 .stream()
+                .filter(tp -> subscriptions.position(tp) != null)
                 .collect(Collectors.toMap(Function.identity(), subscriptions::position));
 
         validateOffsetsAsync(partitionsToValidate);
@@ -672,12 +674,16 @@ public class Fetcher<K, V> implements Closeable {
             log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
                     completedFetch.partition);
         } else {
-            SubscriptionState.FetchPosition position = subscriptions.position(completedFetch.partition);
+            FetchPosition position = subscriptions.position(completedFetch.partition);
+            if (position == null) {
+                throw new IllegalStateException("Missing position for fetchable partition " + completedFetch.partition);
+            }
+
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
 
                 if (completedFetch.nextFetchOffset > position.offset) {
-                    SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
+                    FetchPosition nextPosition = new FetchPosition(
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
@@ -710,8 +716,9 @@ public class Fetcher<K, V> implements Closeable {
         return emptyList();
     }
 
-    private void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
-        SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
+    // Visible for testing
+    void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
+        FetchPosition position = new FetchPosition(
                 offsetData.offset, offsetData.leaderEpoch, metadata.currentLeader(partition));
         offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch));
         subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy);
@@ -768,8 +775,8 @@ public class Fetcher<K, V> implements Closeable {
      *
      * Requests are grouped by Node for efficiency.
      */
-    private void validateOffsetsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
-        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped =
+    private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsToValidate) {
+        final Map<Node, Map<TopicPartition, FetchPosition>> regrouped =
                 regroupFetchPositionsByLeader(partitionsToValidate);
 
         regrouped.forEach((node, fetchPostitions) -> {
@@ -811,7 +818,7 @@ public class Fetcher<K, V> implements Closeable {
                     // for the partition. If so, it means we have experienced log truncation and need to reposition
                     // that partition's offset.
                     offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = fetchPostitions.get(respTopicPartition);
+                        FetchPosition requestPosition = fetchPostitions.get(respTopicPartition);
                         Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation(
                                 respTopicPartition, requestPosition, respEndOffset);
                         divergentOffsetOpt.ifPresent(divergentOffset -> {
@@ -1106,14 +1113,18 @@ public class Fetcher<K, V> implements Closeable {
         long currentTimeMs = time.milliseconds();
 
         for (TopicPartition partition : fetchablePartitions()) {
-            // Use the preferred read replica if set, or the position's leader
-            SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
+            FetchPosition position = this.subscriptions.position(partition);
+            if (position == null) {
+                throw new IllegalStateException("Missing position for fetchable partition " + partition);
+            }
+
             Optional<Node> leaderOpt = position.currentLeader.leader;
             if (!leaderOpt.isPresent()) {
                 metadata.requestUpdate();
                 continue;
             }
 
+            // Use the preferred read replica if set, otherwise the position's leader
             Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
             if (client.isUnavailable(node)) {
                 client.maybeThrowAuthFailure(node);
@@ -1153,8 +1164,8 @@ public class Fetcher<K, V> implements Closeable {
         return reqs;
     }
 
-    private Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader(
-            Map<TopicPartition, SubscriptionState.FetchPosition> partitionMap) {
+    private Map<Node, Map<TopicPartition, FetchPosition>> regroupFetchPositionsByLeader(
+            Map<TopicPartition, FetchPosition> partitionMap) {
         return partitionMap.entrySet()
                 .stream()
                 .filter(entry -> entry.getValue().currentLeader.leader.isPresent())
@@ -1186,7 +1197,7 @@ public class Fetcher<K, V> implements Closeable {
             } else if (error == Errors.NONE) {
                 // we are interested in this fetch only if the beginning offset matches the
                 // current consumed position
-                SubscriptionState.FetchPosition position = subscriptions.position(tp);
+                FetchPosition position = subscriptions.position(tp);
                 if (position == null || position.offset != fetchOffset) {
                     log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
                             "the expected offset {}", tp, fetchOffset, position);
@@ -1255,7 +1266,8 @@ public class Fetcher<K, V> implements Closeable {
                 Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position == null || fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
                                 "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
                     } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index d90b2ed..c32616d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -453,7 +453,7 @@ public class SubscriptionState {
                 return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
             } else {
                 // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation
-                completeValidation(tp);
+                assignedState(tp).updatePositionLeaderNoValidation(leaderAndEpoch);
                 return false;
             }
         } else {
@@ -629,8 +629,8 @@ public class SubscriptionState {
         return assignment.stream().allMatch(state -> state.value().hasValidPosition());
     }
 
-    public synchronized Set<TopicPartition> missingFetchPositions() {
-        return collectPartitions(state -> !state.hasPosition(), Collectors.toSet());
+    public synchronized Set<TopicPartition> initializingPartitions() {
+        return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING), Collectors.toSet());
     }
 
     private <T extends Collection<TopicPartition>> T collectPartitions(Predicate<TopicPartitionState> filter, Collector<TopicPartition, ?, T> collector) {
@@ -641,12 +641,12 @@ public class SubscriptionState {
     }
 
 
-    public synchronized void resetMissingPositions() {
+    public synchronized void resetInitializingPositions() {
         final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
         assignment.stream().forEach(state -> {
             TopicPartition tp = state.topicPartition();
             TopicPartitionState partitionState = state.value();
-            if (!partitionState.hasPosition()) {
+            if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) {
                 if (defaultResetStrategy == OffsetResetStrategy.NONE)
                     partitionsWithNoOffsets.add(tp);
                 else
@@ -744,6 +744,11 @@ public class SubscriptionState {
             if (nextState.equals(newState)) {
                 this.fetchState = nextState;
                 runIfTransitioned.run();
+                if (this.position == null && nextState.requiresPosition()) {
+                    throw new IllegalStateException("Transitioned subscription state to " + nextState + ", but position is null");
+                } else if (!nextState.requiresPosition()) {
+                    this.position = null;
+                }
             }
         }
 
@@ -781,6 +786,13 @@ public class SubscriptionState {
             });
         }
 
+        /**
+         * Check if the position exists and needs to be validated. If so, enter the AWAIT_VALIDATION state. This method
+         * also will update the position with the current leader and epoch.
+         *
+         * @param currentLeaderAndEpoch leader and epoch to compare the offset with
+         * @return true if the position is now awaiting validation
+         */
         private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEpoch) {
             if (this.fetchState.equals(FetchStates.AWAIT_RESET)) {
                 return false;
@@ -798,6 +810,18 @@ public class SubscriptionState {
             return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
         }
 
+        /**
+         * For older versions of the API, we cannot perform offset validation so we simply transition directly to FETCHING
+         */
+        private void updatePositionLeaderNoValidation(Metadata.LeaderAndEpoch currentLeaderAndEpoch) {
+            if (position != null) {
+                transitionState(FetchStates.FETCHING, () -> {
+                    this.position = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);
+                    this.nextRetryTimeMs = null;
+                });
+            }
+        }
+
         private void validatePosition(FetchPosition position) {
             if (position.offsetEpoch.isPresent() && position.currentLeader.epoch.isPresent()) {
                 transitionState(FetchStates.AWAIT_VALIDATION, () -> {
@@ -849,7 +873,7 @@ public class SubscriptionState {
         }
 
         private boolean hasPosition() {
-            return fetchState.hasPosition();
+            return position != null;
         }
 
         private boolean isPaused() {
@@ -925,10 +949,19 @@ public class SubscriptionState {
             }
         }
 
+        /**
+         * Return the valid states which this state can transition to
+         */
         Collection<FetchState> validTransitions();
 
-        boolean hasPosition();
+        /**
+         * Test if this state requires a position to be set
+         */
+        boolean requiresPosition();
 
+        /**
+         * Test if this state is considered to have a valid position which can be used for fetching
+         */
         boolean hasValidPosition();
     }
 
@@ -944,7 +977,7 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
+            public boolean requiresPosition() {
                 return false;
             }
 
@@ -961,7 +994,7 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
+            public boolean requiresPosition() {
                 return true;
             }
 
@@ -978,8 +1011,8 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
-                return true;
+            public boolean requiresPosition() {
+                return false;
             }
 
             @Override
@@ -995,7 +1028,7 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
+            public boolean requiresPosition() {
                 return true;
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0ac1e69..f238267 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2069,7 +2069,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2091,7 +2091,7 @@ public class ConsumerCoordinatorTest {
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
         // Offset gets loaded, but requires validation
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertFalse(subscriptions.hasAllFetchPositions());
         assertTrue(subscriptions.awaitingValidation(t1p));
         assertEquals(subscriptions.position(t1p).offset, 100L);
@@ -2142,7 +2142,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2170,12 +2170,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.UNSTABLE_OFFSET_COMMIT, "", -1L));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
-        assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
+        assertEquals(Collections.singleton(t1p), subscriptions.initializingPartitions());
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(0L));
-        assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
+        assertEquals(Collections.singleton(t1p), subscriptions.initializingPartitions());
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(0L));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2201,7 +2201,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2215,7 +2215,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", -1L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
+        assertEquals(Collections.singleton(t1p), subscriptions.initializingPartitions());
         assertEquals(Collections.emptySet(), subscriptions.partitionsNeedingReset(time.milliseconds()));
         assertFalse(subscriptions.hasAllFetchPositions());
         assertNull(subscriptions.position(t1p));
@@ -2229,7 +2229,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.seek(t1p, 500L);
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(500L, subscriptions.position(t1p).offset);
         assertTrue(coordinator.coordinatorUnknown());
@@ -2243,7 +2243,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.requestOffsetReset(t1p, OffsetResetStrategy.EARLIEST);
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertFalse(subscriptions.hasAllFetchPositions());
         assertEquals(Collections.singleton(t1p), subscriptions.partitionsNeedingReset(time.milliseconds()));
         assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(t1p));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d85df27..b6c4ca1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1183,7 +1183,7 @@ public class FetcherTest {
         assertEquals(0, fetcher.fetchedRecords().size());
         assertTrue(subscriptions.isOffsetResetNeeded(tp0));
         assertNull(subscriptions.validPosition(tp0));
-        assertNotNull(subscriptions.position(tp0));
+        assertNull(subscriptions.position(tp0));
     }
 
     @Test
@@ -3852,6 +3852,35 @@ public class FetcherTest {
     }
 
     @Test
+    public void testSkipValidationForOlderApiVersion() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
+
+        // Start with metadata, epoch=1
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+                Collections.emptyMap(), partitionCounts, tp -> 1), false, 0L);
+
+        // Request offset reset
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Since we have no position due to reset, no fetch is sent
+        assertEquals(0, fetcher.sendFetches());
+
+        // Still no position, ensure offset validation logic did not transition us to FETCHING state
+        assertEquals(0, fetcher.sendFetches());
+
+        // Complete reset and now we can fetch
+        fetcher.resetOffsetIfNeeded(tp0, OffsetResetStrategy.LATEST,
+                new Fetcher.ListOffsetData(100, 1L, Optional.empty()));
+        assertEquals(1, fetcher.sendFetches());
+    }
+
+    @Test
     public void testTruncationDetected() {
         // Create some records that include a leader epoch (1)
         MemoryRecordsBuilder builder = MemoryRecords.builder(
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index ae00b8e..c7071b6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -42,7 +42,7 @@ import java.util.regex.Pattern;
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class SubscriptionStateTest {
@@ -210,7 +210,7 @@ public class SubscriptionStateTest {
         state.requestOffsetReset(tp0);
         assertFalse(state.isFetchable(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
-        assertNotNull(state.position(tp0));
+        assertNull(state.position(tp0));
 
         // seek should clear the reset and make the partition fetchable
         state.seek(tp0, 0);
@@ -600,6 +600,7 @@ public class SubscriptionStateTest {
         assertEquals(Optional.empty(), divergentOffsetMetadataOpt);
         assertFalse(state.awaitingValidation(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
+        assertNull(state.position(tp0));
     }
 
     @Test
@@ -672,4 +673,53 @@ public class SubscriptionStateTest {
 
     }
 
+    @Test
+    public void resetOffsetNoValidation() {
+        // Check that offset reset works when we can't validate offsets (older brokers)
+
+        Node broker1 = new Node(1, "localhost", 9092);
+        state.assignFromUser(Collections.singleton(tp0));
+
+        // Reset offsets
+        state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+
+        // Attempt to validate with older API version, should do nothing
+        ApiVersions oldApis = new ApiVersions();
+        oldApis.update("1", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
+        assertFalse(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertTrue(state.isOffsetResetNeeded(tp0));
+
+        // Complete the reset via unvalidated seek
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L));
+        assertTrue(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+
+        // Next call to validate offsets does nothing
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
+        assertTrue(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+
+        // Reset again, and complete it with a seek that would normally require validation
+        state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(10), new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(2))));
+        // We are now in AWAIT_VALIDATION
+        assertFalse(state.hasValidPosition(tp0));
+        assertTrue(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+
+        // Now ensure next call to validate clears the validation state
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(2))));
+        assertTrue(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+    }
+
 }