You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/06/18 03:28:31 UTC

[kafka] branch 2.6 updated: KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (#8841)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 1f0c847  KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (#8841)
1f0c847 is described below

commit 1f0c84715375261160eab03bd5d2685cd94aebe1
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Jun 17 23:23:45 2020 -0400

    KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (#8841)
    
    ## Background
    
    When a partition subscription is initialized it has a `null` position and is in the INITIALIZING state. Depending on the consumer, it will then transition to one of the other states. Typically a consumer will either reset the offset to earliest/latest, or it will provide an offset (with or without offset metadata). For the reset case, we still have no position to act on so fetches should not occur.
    
    Recently we made changes for KAFKA-9724 (#8376) to prevent clients from entering the AWAIT_VALIDATION state when targeting older brokers. New logic to bypass offset validation as part of this change exposed this new issue.
    
    ## Bug and Fix
    
    In the partition subscriptions, the AWAIT_RESET state was incorrectly reporting that it had a position. In some cases a position might actually exist (e.g., if we were resetting offsets during a fetch after a truncation), but in the initialization case no position had been set. We saw this issue in system tests where there is a race between the offset reset completing and the first fetch request being issued.
    
    Since AWAIT_RESET#hasPosition was incorrectly returning `true`, the new logic to bypass offset validation was transitioning the subscription to FETCHING (even though no position existed).
    
    The fix was simply to have AWAIT_RESET#hasPosition to return `false` which should have been the case from the start. Additionally, this fix includes some guards against NPE when reading the position from the subscription.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../consumer/internals/ConsumerCoordinator.java    |  4 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 23 ++++++---
 .../consumer/internals/SubscriptionState.java      | 57 +++++++++++++++++-----
 .../internals/ConsumerCoordinatorTest.java         | 20 ++++----
 .../clients/consumer/internals/FetcherTest.java    | 33 +++++++++++--
 .../consumer/internals/SubscriptionStateTest.java  | 55 +++++++++++++++++++--
 7 files changed, 157 insertions(+), 37 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7535a9b..610cfde 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2420,7 +2420,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         // If there are partitions still needing a position and a reset policy is defined,
         // request reset using the default policy. If no reset strategy is defined and there
         // are partitions with a missing position, then we will raise an exception.
-        subscriptions.resetMissingPositions();
+        subscriptions.resetInitializingPositions();
 
         // Finally send an asynchronous request to lookup and update the positions of any
         // partitions which are awaiting reset.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 914071b..66e702a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -783,9 +783,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * @return true iff the operation completed within the timeout
      */
     public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
-        final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
+        final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions();
 
-        final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timer);
+        final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(initializingPartitions, timer);
         if (offsets == null) return false;
 
         for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3b34f14..d6dcda4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -207,7 +207,7 @@ public class Fetcher<K, V> implements Closeable {
     /**
      * Represents data about an offset returned by a broker.
      */
-    private static class ListOffsetData {
+    static class ListOffsetData {
         final long offset;
         final Long timestamp; //  null if the broker does not support returning timestamps
         final Optional<Integer> leaderEpoch; // empty if the leader epoch is not known
@@ -495,6 +495,7 @@ public class Fetcher<K, V> implements Closeable {
         Map<TopicPartition, FetchPosition> partitionsToValidate = subscriptions
                 .partitionsNeedingValidation(time.milliseconds())
                 .stream()
+                .filter(tp -> subscriptions.position(tp) != null)
                 .collect(Collectors.toMap(Function.identity(), subscriptions::position));
 
         validateOffsetsAsync(partitionsToValidate);
@@ -675,6 +676,10 @@ public class Fetcher<K, V> implements Closeable {
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
+            if (position == null) {
+                throw new IllegalStateException("Missing position for fetchable partition " + completedFetch.partition);
+            }
+
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
 
@@ -714,7 +719,8 @@ public class Fetcher<K, V> implements Closeable {
         return emptyList();
     }
 
-    private void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
+    // Visible for testing
+    void resetOffsetIfNeeded(TopicPartition partition, OffsetResetStrategy requestedResetStrategy, ListOffsetData offsetData) {
         FetchPosition position = new FetchPosition(
                 offsetData.offset, offsetData.leaderEpoch, metadata.currentLeader(partition));
         offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch));
@@ -1132,14 +1138,18 @@ public class Fetcher<K, V> implements Closeable {
         long currentTimeMs = time.milliseconds();
 
         for (TopicPartition partition : fetchablePartitions()) {
-            // Use the preferred read replica if set, or the position's leader
             FetchPosition position = this.subscriptions.position(partition);
+            if (position == null) {
+                throw new IllegalStateException("Missing position for fetchable partition " + partition);
+            }
+
             Optional<Node> leaderOpt = position.currentLeader.leader;
             if (!leaderOpt.isPresent()) {
                 metadata.requestUpdate();
                 continue;
             }
 
+            // Use the preferred read replica if set, otherwise the position's leader
             Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
             if (client.isUnavailable(node)) {
                 client.maybeThrowAuthFailure(node);
@@ -1281,11 +1291,12 @@ public class Fetcher<K, V> implements Closeable {
                 Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position == null || fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, "error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response in offset fetch");
                     }
                 } else {
                     log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 0d64942..b9885d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -454,7 +454,7 @@ public class SubscriptionState {
                 return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
             } else {
                 // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation
-                completeValidation(tp);
+                assignedState(tp).updatePositionLeaderNoValidation(leaderAndEpoch);
                 return false;
             }
         } else {
@@ -630,8 +630,8 @@ public class SubscriptionState {
         return assignment.stream().allMatch(state -> state.value().hasValidPosition());
     }
 
-    public synchronized Set<TopicPartition> missingFetchPositions() {
-        return collectPartitions(state -> !state.hasPosition(), Collectors.toSet());
+    public synchronized Set<TopicPartition> initializingPartitions() {
+        return collectPartitions(state -> state.fetchState.equals(FetchStates.INITIALIZING), Collectors.toSet());
     }
 
     private <T extends Collection<TopicPartition>> T collectPartitions(Predicate<TopicPartitionState> filter, Collector<TopicPartition, ?, T> collector) {
@@ -642,12 +642,12 @@ public class SubscriptionState {
     }
 
 
-    public synchronized void resetMissingPositions() {
+    public synchronized void resetInitializingPositions() {
         final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
         assignment.stream().forEach(state -> {
             TopicPartition tp = state.topicPartition();
             TopicPartitionState partitionState = state.value();
-            if (!partitionState.hasPosition()) {
+            if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) {
                 if (defaultResetStrategy == OffsetResetStrategy.NONE)
                     partitionsWithNoOffsets.add(tp);
                 else
@@ -745,6 +745,11 @@ public class SubscriptionState {
             if (nextState.equals(newState)) {
                 this.fetchState = nextState;
                 runIfTransitioned.run();
+                if (this.position == null && nextState.requiresPosition()) {
+                    throw new IllegalStateException("Transitioned subscription state to " + nextState + ", but position is null");
+                } else if (!nextState.requiresPosition()) {
+                    this.position = null;
+                }
             }
         }
 
@@ -782,6 +787,13 @@ public class SubscriptionState {
             });
         }
 
+        /**
+         * Check if the position exists and needs to be validated. If so, enter the AWAIT_VALIDATION state. This method
+         * also will update the position with the current leader and epoch.
+         *
+         * @param currentLeaderAndEpoch leader and epoch to compare the offset with
+         * @return true if the position is now awaiting validation
+         */
         private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEpoch) {
             if (this.fetchState.equals(FetchStates.AWAIT_RESET)) {
                 return false;
@@ -799,6 +811,18 @@ public class SubscriptionState {
             return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
         }
 
+        /**
+         * For older versions of the API, we cannot perform offset validation so we simply transition directly to FETCHING
+         */
+        private void updatePositionLeaderNoValidation(Metadata.LeaderAndEpoch currentLeaderAndEpoch) {
+            if (position != null) {
+                transitionState(FetchStates.FETCHING, () -> {
+                    this.position = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);
+                    this.nextRetryTimeMs = null;
+                });
+            }
+        }
+
         private void validatePosition(FetchPosition position) {
             if (position.offsetEpoch.isPresent() && position.currentLeader.epoch.isPresent()) {
                 transitionState(FetchStates.AWAIT_VALIDATION, () -> {
@@ -848,7 +872,7 @@ public class SubscriptionState {
         }
 
         private boolean hasPosition() {
-            return fetchState.hasPosition();
+            return position != null;
         }
 
         private boolean isPaused() {
@@ -924,10 +948,19 @@ public class SubscriptionState {
             }
         }
 
+        /**
+         * Return the valid states which this state can transition to
+         */
         Collection<FetchState> validTransitions();
 
-        boolean hasPosition();
+        /**
+         * Test if this state requires a position to be set
+         */
+        boolean requiresPosition();
 
+        /**
+         * Test if this state is considered to have a valid position which can be used for fetching
+         */
         boolean hasValidPosition();
     }
 
@@ -943,7 +976,7 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
+            public boolean requiresPosition() {
                 return false;
             }
 
@@ -960,7 +993,7 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
+            public boolean requiresPosition() {
                 return true;
             }
 
@@ -977,8 +1010,8 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
-                return true;
+            public boolean requiresPosition() {
+                return false;
             }
 
             @Override
@@ -994,7 +1027,7 @@ public class SubscriptionState {
             }
 
             @Override
-            public boolean hasPosition() {
+            public boolean requiresPosition() {
                 return true;
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index d61010e..7b2495c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2342,7 +2342,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2364,7 +2364,7 @@ public class ConsumerCoordinatorTest {
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
         // Offset gets loaded, but requires validation
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertFalse(subscriptions.hasAllFetchPositions());
         assertTrue(subscriptions.awaitingValidation(t1p));
         assertEquals(subscriptions.position(t1p).offset, 100L);
@@ -2415,7 +2415,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2443,12 +2443,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(singleton(t1p));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.UNSTABLE_OFFSET_COMMIT, "", -1L));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
-        assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
+        assertEquals(Collections.singleton(t1p), subscriptions.initializingPartitions());
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(0L));
-        assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
+        assertEquals(Collections.singleton(t1p), subscriptions.initializingPartitions());
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(0L));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2474,7 +2474,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(100L, subscriptions.position(t1p).offset);
     }
@@ -2488,7 +2488,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", -1L));
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
+        assertEquals(Collections.singleton(t1p), subscriptions.initializingPartitions());
         assertEquals(Collections.emptySet(), subscriptions.partitionsNeedingReset(time.milliseconds()));
         assertFalse(subscriptions.hasAllFetchPositions());
         assertNull(subscriptions.position(t1p));
@@ -2502,7 +2502,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.seek(t1p, 500L);
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertTrue(subscriptions.hasAllFetchPositions());
         assertEquals(500L, subscriptions.position(t1p).offset);
         assertTrue(coordinator.coordinatorUnknown());
@@ -2516,7 +2516,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.requestOffsetReset(t1p, OffsetResetStrategy.EARLIEST);
         coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions());
+        assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
         assertFalse(subscriptions.hasAllFetchPositions());
         assertEquals(Collections.singleton(t1p), subscriptions.partitionsNeedingReset(time.milliseconds()));
         assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(t1p));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2010965..d13e778 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1190,7 +1190,7 @@ public class FetcherTest {
         assertEquals(0, fetcher.fetchedRecords().size());
         assertTrue(subscriptions.isOffsetResetNeeded(tp0));
         assertNull(subscriptions.validPosition(tp0));
-        assertNotNull(subscriptions.position(tp0));
+        assertNull(subscriptions.position(tp0));
     }
 
     @Test
@@ -3853,8 +3853,6 @@ public class FetcherTest {
         }, new OffsetsForLeaderEpochResponse(singletonMap(tp0, epochEndOffset)));
         consumerClient.poll(time.timer(Duration.ZERO));
 
-        assertEquals(initialOffset, subscriptions.position(tp0).offset);
-
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
             OffsetOutOfRangeException thrown =
                 assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded());
@@ -3968,6 +3966,35 @@ public class FetcherTest {
     }
 
     @Test
+    public void testSkipValidationForOlderApiVersion() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
+
+        // Start with metadata, epoch=1
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+                Collections.emptyMap(), partitionCounts, tp -> 1), false, 0L);
+
+        // Request offset reset
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Since we have no position due to reset, no fetch is sent
+        assertEquals(0, fetcher.sendFetches());
+
+        // Still no position, ensure offset validation logic did not transition us to FETCHING state
+        assertEquals(0, fetcher.sendFetches());
+
+        // Complete reset and now we can fetch
+        fetcher.resetOffsetIfNeeded(tp0, OffsetResetStrategy.LATEST,
+                new Fetcher.ListOffsetData(100, 1L, Optional.empty()));
+        assertEquals(1, fetcher.sendFetches());
+    }
+
+    @Test
     public void testTruncationDetected() {
         // Create some records that include a leader epoch (1)
         MemoryRecordsBuilder builder = MemoryRecords.builder(
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 7aeba97..6d71ebc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -42,7 +42,7 @@ import java.util.regex.Pattern;
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class SubscriptionStateTest {
@@ -210,7 +210,7 @@ public class SubscriptionStateTest {
         state.requestOffsetReset(tp0);
         assertFalse(state.isFetchable(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
-        assertNotNull(state.position(tp0));
+        assertNull(state.position(tp0));
 
         // seek should clear the reset and make the partition fetchable
         state.seek(tp0, 0);
@@ -600,7 +600,7 @@ public class SubscriptionStateTest {
         assertEquals(Optional.empty(), divergentOffsetMetadataOpt);
         assertFalse(state.awaitingValidation(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
-        assertEquals(initialPosition, state.position(tp0));
+        assertNull(state.position(tp0));
     }
 
     @Test
@@ -673,4 +673,53 @@ public class SubscriptionStateTest {
 
     }
 
+    @Test
+    public void resetOffsetNoValidation() {
+        // Check that offset reset works when we can't validate offsets (older brokers)
+
+        Node broker1 = new Node(1, "localhost", 9092);
+        state.assignFromUser(Collections.singleton(tp0));
+
+        // Reset offsets
+        state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+
+        // Attempt to validate with older API version, should do nothing
+        ApiVersions oldApis = new ApiVersions();
+        oldApis.update("1", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
+        assertFalse(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertTrue(state.isOffsetResetNeeded(tp0));
+
+        // Complete the reset via unvalidated seek
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L));
+        assertTrue(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+
+        // Next call to validate offsets does nothing
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
+        assertTrue(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+
+        // Reset again, and complete it with a seek that would normally require validation
+        state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(10), new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(2))));
+        // We are now in AWAIT_VALIDATION
+        assertFalse(state.hasValidPosition(tp0));
+        assertTrue(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+
+        // Now ensure next call to validate clears the validation state
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.of(2))));
+        assertTrue(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
+    }
+
 }