You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 21:16:25 UTC

[GitHub] [kafka] mumrah opened a new pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

mumrah opened a new pull request #8841:
URL: https://github.com/apache/kafka/pull/8841


   Also ensure we are updating the position with the current leader info even when
   fetching from older brokers.
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441036886



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, nextPosition);
-                }
+            if (position != null) {
+                if (completedFetch.nextFetchOffset == position.offset) {
+                    List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
+
+                    log.trace("Returning {} fetched records at offset {} for assigned partition {}",
+                            partRecords.size(), position, completedFetch.partition);
+
+                    if (completedFetch.nextFetchOffset > position.offset) {
+                        FetchPosition nextPosition = new FetchPosition(
+                                completedFetch.nextFetchOffset,
+                                completedFetch.lastEpoch,
+                                position.currentLeader);
+                        log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                        subscriptions.position(completedFetch.partition, nextPosition);
+                    }
 
-                Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
+                    Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel);
+                    if (partitionLag != null)
+                        this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
 
-                Long lead = subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    this.sensors.recordPartitionLead(completedFetch.partition, lead);
-                }
+                    Long lead = subscriptions.partitionLead(completedFetch.partition);
+                    if (lead != null) {
+                        this.sensors.recordPartitionLead(completedFetch.partition, lead);
+                    }
 
-                return partRecords;
+                    return partRecords;
+                } else {
+                    // these records aren't next in line based on the last consumed position, ignore them
+                    // they must be from an obsolete request
+                    log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
+                            completedFetch.partition, completedFetch.nextFetchOffset, position);
+                }
             } else {
-                // these records aren't next in line based on the last consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
-                        completedFetch.partition, completedFetch.nextFetchOffset, position);
+                log.warn("Ignoring fetched records for {} at offset {} since the current position is undefined",

Review comment:
       Sounds fine to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438867535



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, nextPosition);
-                }
+            if (position != null) {

Review comment:
       I think we need the check in transition since we set the position in the runnable sometimes. E.g., 
   
   ```java
       private void transitionState(FetchState newState, Runnable runIfTransitioned) {
               FetchState nextState = this.fetchState.transitionTo(newState);
               if (nextState.equals(newState)) {
                   this.fetchState = nextState;
                   runIfTransitioned.run();
                   if (this.position == null && nextState.hasPosition()) {
                       throw new IllegalStateException("Transitioned subscription state to " + nextState + ", but position is null");
                   }
               }
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438346871



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       It's a bit weird to add a comment here and not elsewhere. It looks like a clear bug since `hasValidPosition` returns `false`. Generally, comments are useful when it's not obvious. What kind of clarification would you like to see?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-645667180


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441680618



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -782,6 +787,13 @@ private void reset(OffsetResetStrategy strategy) {
             });
         }
 
+        /**
+         * Check if the position exists and needs to be validated. If so, enter the AWAIT_VALIDATION state. This method
+         * also will update the position with the current leader and epoch.
+         *
+         * @param currentLeaderAndEpoch leader and epoch to compare the offset with
+         * @return

Review comment:
       nit: document return 

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -783,7 +783,7 @@ public boolean rejoinNeededOrPending() {
      * @return true iff the operation completed within the timeout
      */
     public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
-        final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
+        final Set<TopicPartition> missingFetchPositions = subscriptions.initializingPartitions();

Review comment:
       nit: rename variable as well?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1281,11 +1291,12 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
                 Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position != null && fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, "error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response in offset fetch");

Review comment:
       Hmm, if the position is null, we raise out of range?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
##########
@@ -673,4 +673,37 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 
     }
 
+    @Test
+    public void resetOffsetNoValidation() {
+        // Check that offset reset works when we can't validate offsets (older brokers)
+
+        Node broker1 = new Node(1, "localhost", 9092);
+        state.assignFromUser(Collections.singleton(tp0));
+
+        // Reset offsets
+        state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+
+        // Attempt to validate with older API version, should do nothing
+        ApiVersions oldApis = new ApiVersions();
+        oldApis.update("1", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
+        assertFalse(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertTrue(state.isOffsetResetNeeded(tp0));
+
+        // Complete the reset via unvalidated seek
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L));

Review comment:
       This test would might be more interesting if we did a seek which required validation. Could we provide an epoch in the fetch position? Maybe both cases should be covered?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -924,10 +951,19 @@ default FetchState transitionTo(FetchState newState) {
             }
         }
 
+        /**
+         * Return the valid states which this state can transition to
+         */
         Collection<FetchState> validTransitions();
 
-        boolean hasPosition();
+        /**
+         * Test if this state has a position

Review comment:
       nit: fix doc

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -799,6 +806,21 @@ private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
             return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
         }
 
+        /**
+         * For older versions of the API, we cannot perform offset validation so we simply transition directly to FETCHING
+         *
+         * @param currentLeaderAndEpoch
+         */
+        private void updatePositionLeaderNoValidation(Metadata.LeaderAndEpoch currentLeaderAndEpoch) {
+            if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) {
+                FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);

Review comment:
       +1. Moving this into the transition function below seems reasonable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r437913208



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       As it produced a bug, could you add some comment for this method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #8841:
URL: https://github.com/apache/kafka/pull/8841


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-643035118


   The test failures seem to be flaky (KAFKA-10155)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441007131



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, nextPosition);
-                }
+            if (position != null) {
+                if (completedFetch.nextFetchOffset == position.offset) {
+                    List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
+
+                    log.trace("Returning {} fetched records at offset {} for assigned partition {}",
+                            partRecords.size(), position, completedFetch.partition);
+
+                    if (completedFetch.nextFetchOffset > position.offset) {
+                        FetchPosition nextPosition = new FetchPosition(
+                                completedFetch.nextFetchOffset,
+                                completedFetch.lastEpoch,
+                                position.currentLeader);
+                        log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                        subscriptions.position(completedFetch.partition, nextPosition);
+                    }
 
-                Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
+                    Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel);
+                    if (partitionLag != null)
+                        this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
 
-                Long lead = subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    this.sensors.recordPartitionLead(completedFetch.partition, lead);
-                }
+                    Long lead = subscriptions.partitionLead(completedFetch.partition);
+                    if (lead != null) {
+                        this.sensors.recordPartitionLead(completedFetch.partition, lead);
+                    }
 
-                return partRecords;
+                    return partRecords;
+                } else {
+                    // these records aren't next in line based on the last consumed position, ignore them
+                    // they must be from an obsolete request
+                    log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
+                            completedFetch.partition, completedFetch.nextFetchOffset, position);
+                }
             } else {
-                // these records aren't next in line based on the last consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
-                        completedFetch.partition, completedFetch.nextFetchOffset, position);
+                log.warn("Ignoring fetched records for {} at offset {} since the current position is undefined",

Review comment:
       Would IllegalStateException make sense here (since we just checked that the partition was fetchable)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-645682589


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-642098477


   Added some null checks for `SubscriptionState#position`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438870452



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       I thought about this too, but if i recall, the whole point of adding the state enum was to not rely on the instance variables to deduce the state. I think another solution here would be to have some kind of empty position monad to avoid the null in the first place. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438870452



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       I thought about this too, but if i recall, the whole point of adding the state enum was to not rely on the instance variables to deduce the state. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441747935



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
##########
@@ -673,4 +673,37 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 
     }
 
+    @Test
+    public void resetOffsetNoValidation() {
+        // Check that offset reset works when we can't validate offsets (older brokers)
+
+        Node broker1 = new Node(1, "localhost", 9092);
+        state.assignFromUser(Collections.singleton(tp0));
+
+        // Reset offsets
+        state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+
+        // Attempt to validate with older API version, should do nothing
+        ApiVersions oldApis = new ApiVersions();
+        oldApis.update("1", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
+        assertFalse(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertTrue(state.isOffsetResetNeeded(tp0));
+
+        // Complete the reset via unvalidated seek
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L));

Review comment:
       Sure, I'll add another case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438346871



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       It's a bit weird to add a comment here and not elsewhere. It looks like a clear bug since `hasValidPosition` returns `false`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441115973



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,40 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, nextPosition);
-                }
+            if (position != null) {

Review comment:
       nit: maybe we could check for null first so that we avoid the nesting below (and reduce the diff)
   ```java
   if (position == null)
     throw new IllegalStateException("Missing position for fetchable partition " + completedFetch.partition);
   
   if (completedFetch.nextFetchOffset == position.offset) {
   ...
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r440984705



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, nextPosition);
-                }
+            if (position != null) {
+                if (completedFetch.nextFetchOffset == position.offset) {
+                    List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
+
+                    log.trace("Returning {} fetched records at offset {} for assigned partition {}",
+                            partRecords.size(), position, completedFetch.partition);
+
+                    if (completedFetch.nextFetchOffset > position.offset) {
+                        FetchPosition nextPosition = new FetchPosition(
+                                completedFetch.nextFetchOffset,
+                                completedFetch.lastEpoch,
+                                position.currentLeader);
+                        log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                        subscriptions.position(completedFetch.partition, nextPosition);
+                    }
 
-                Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
+                    Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel);
+                    if (partitionLag != null)
+                        this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
 
-                Long lead = subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    this.sensors.recordPartitionLead(completedFetch.partition, lead);
-                }
+                    Long lead = subscriptions.partitionLead(completedFetch.partition);
+                    if (lead != null) {
+                        this.sensors.recordPartitionLead(completedFetch.partition, lead);
+                    }
 
-                return partRecords;
+                    return partRecords;
+                } else {
+                    // these records aren't next in line based on the last consumed position, ignore them
+                    // they must be from an obsolete request
+                    log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
+                            completedFetch.partition, completedFetch.nextFetchOffset, position);
+                }
             } else {
-                // these records aren't next in line based on the last consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
-                        completedFetch.partition, completedFetch.nextFetchOffset, position);
+                log.warn("Ignoring fetched records for {} at offset {} since the current position is undefined",

Review comment:
       This comment applies to a few of the added null checks where we have already validated that the partition is "fetchable." I am wondering if it would be more consistent to raise an exception.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -924,10 +949,19 @@ default FetchState transitionTo(FetchState newState) {
             }
         }
 
+        /**
+         * Return the valid states which this state can transition to
+         */
         Collection<FetchState> validTransitions();
 
+        /**
+         * Test if this state has a position

Review comment:
       Since the usage is a bit different, maybe we could change the name to `requiresPosition`. Then this check seems a little more intuitive:
   ```java
                   if (this.position == null && nextState.requiresPosition()) {
                       throw new IllegalStateException("Transitioned subscription state to " + nextState + ", but position is null");
                   }
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -745,6 +745,9 @@ private void transitionState(FetchState newState, Runnable runIfTransitioned) {
             if (nextState.equals(newState)) {
                 this.fetchState = nextState;
                 runIfTransitioned.run();
+                if (this.position == null && nextState.hasPosition()) {

Review comment:
       Would it make sense to set `position` explicitly to null if the `FetchState` does not expect to have it. For example, it seems currently when we reset the offset, we leave `position` at whatever value it had previously. If we were initializing, then it would be null. If we had an offset out of range, it would be non-null. It might be easier to reason about the logic if it is always null in the AWAIT_RESET state.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -647,7 +647,7 @@ public synchronized void resetMissingPositions() {
         assignment.stream().forEach(state -> {
             TopicPartition tp = state.topicPartition();
             TopicPartitionState partitionState = state.value();
-            if (!partitionState.hasPosition()) {
+            if (partitionState.fetchState.equals(FetchStates.INITIALIZING)) {

Review comment:
       Should we change the name of this method to something like `resetInitializingPositions`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441742901



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1281,11 +1291,12 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
                 Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position != null && fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, "error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response in offset fetch");

Review comment:
       What should we do here for null position? This can happen if we get OOOR while we're in the middle of a reset. Maybe we should just log a warning?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r439510121



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       I added some javadoc to the interface




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-645693665


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r437918610



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -799,6 +806,21 @@ private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
             return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
         }
 
+        /**
+         * For older versions of the API, we cannot perform offset validation so we simply transition directly to FETCHING
+         *
+         * @param currentLeaderAndEpoch
+         */
+        private void updatePositionLeaderNoValidation(Metadata.LeaderAndEpoch currentLeaderAndEpoch) {
+            if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) {
+                FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);

Review comment:
       nit: ```newPosition``` can be created lazy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441776235



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1281,11 +1291,12 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
                 Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position != null && fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, "error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response in offset fetch");

Review comment:
       Since we have the check for `hasValidPosition` at the start of this method, we _could_ raise an exception. However, in the success case, we currently just ignore the response if the position is null. I'm ok with either option.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-645746951


   It looks like the test failure is a known issue, which was recently reopened: https://issues.apache.org/jira/browse/KAFKA-9509.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438362895



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       Would it make sense to move `hasPosition` to `TopicPartitionState`? Then we could just turn this into a null check on `position`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, nextPosition);
-                }
+            if (position != null) {

Review comment:
       I think the invariant that we try to maintain is that we should have a position if we are in the FETCHING state. I'd suggest we detect this in `transitionState` and raise the exception at that point. Otherwise, we could reach an illegal state and the consumer would just stop fetching the partition. Failing fast is probably preferable. What I have in mind is just something like this:
   ```java
           private void transitionState(FetchState newState, Runnable runIfTransitioned) {
               FetchState nextState = this.fetchState.transitionTo(newState);
               if (nextState.equals(newState)) {
                   if (position == null && (nextState == FETCHING || nextState == AWAIT_VALIDATION))
                      throw new IllegalStateException();
                   this.fetchState = nextState;
                   runIfTransitioned.run();
               }
           }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r439009094



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       Sure, but that led to the opposite problem, in which the enum was inconsistent with the state. In regard to position, I think we should handle this at transition time as mentioned below. If we ensure that position is not null in the fetching and validating states, then I don't see a problem changing `hasPosition` to check it directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r438379247



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
-                        partRecords.size(), position, completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, nextPosition);
-                }
+            if (position != null) {

Review comment:
       Yea, I see what you mean. It's probably a good idea to add a check here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441933113



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1281,11 +1291,12 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
                 Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position != null && fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, "error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response in offset fetch");

Review comment:
       I'm inclined to keep the same behavior as we previously did and just warn (not that we expect this case to get hit anymore)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r439445841



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -978,7 +1000,7 @@ public boolean hasValidPosition() {
 
             @Override
             public boolean hasPosition() {

Review comment:
       Ok, sounds good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441109398



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -745,6 +745,9 @@ private void transitionState(FetchState newState, Runnable runIfTransitioned) {
             if (nextState.equals(newState)) {
                 this.fetchState = nextState;
                 runIfTransitioned.run();
+                if (this.position == null && nextState.hasPosition()) {

Review comment:
       Yea, sounds good 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441742901



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1281,11 +1291,12 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
                 Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position != null && fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, "error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response in offset fetch");

Review comment:
       What should we do here for null position? This can happen if we get OOOR while we're in the middle of a reset. Maybe we should just log a warning? Or maybe just change the condition to
   
   ```java
   if (position == null || fetchOffset != position.offset)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #8841: KAFKA-10123 Fix incorrect value for AWAIT_RESET#hasPosition

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#issuecomment-645054474


   Ran client system tests here, all passing except for one which looks flaky 
   
   http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-16--001.1592335112--mumrah--KAFKA-10123--63c1b14a4/report.html
   
   ```
   ====================================================================================================
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.7.7
   session_id:       2020-06-16--001
   run time:         62 minutes 47.115 seconds
   tests run:        61
   passed:           60
   failed:           1
   ignored:          0
   ====================================================================================================
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=0.10.0.1
   status:     PASS
   run time:   1 minute 0.624 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=0.10.1.1
   status:     PASS
   run time:   57.225 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=0.10.2.2
   status:     PASS
   run time:   57.211 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=0.11.0.3
   status:     PASS
   run time:   59.375 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=1.0.2
   status:     PASS
   run time:   1 minute 1.141 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=1.1.1
   status:     PASS
   run time:   58.956 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=2.0.1
   status:     PASS
   run time:   1 minute 3.180 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=2.1.1
   status:     PASS
   run time:   1 minute 8.069 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=2.2.2
   status:     PASS
   run time:   1 minute 4.166 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=2.3.1
   status:     PASS
   run time:   1 minute 4.140 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=2.4.1
   status:     PASS
   run time:   1 minute 5.373 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest.run_compatibility_test.broker_version=dev
   status:     PASS
   run time:   1 minute 2.516 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=0.10.0.1
   status:     PASS
   run time:   2 minutes 21.015 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=0.10.1.1
   status:     PASS
   run time:   2 minutes 25.114 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=0.10.2.2
   status:     PASS
   run time:   2 minutes 20.943 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=0.11.0.3
   status:     PASS
   run time:   2 minutes 27.628 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=1.0.2
   status:     PASS
   run time:   2 minutes 23.814 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=1.1.1
   status:     PASS
   run time:   2 minutes 25.035 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=2.0.1
   status:     PASS
   run time:   2 minutes 26.260 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=2.1.1
   status:     PASS
   run time:   2 minutes 30.776 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=2.2.2
   status:     PASS
   run time:   2 minutes 29.093 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=2.3.1
   status:     PASS
   run time:   2 minutes 34.028 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=2.4.1
   status:     PASS
   run time:   2 minutes 32.940 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest.test_produce_consume.broker_version=dev
   status:     PASS
   run time:   2 minutes 28.548 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.pluggable_test.PluggableConsumerTest.test_start_stop
   status:     PASS
   run time:   20.868 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RangeAssignor
   status:     PASS
   run time:   55.992 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
   status:     PASS
   run time:   55.605 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.assignment_strategy=org.apache.kafka.clients.consumer.StickyAssignor
   status:     PASS
   run time:   56.918 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.message_format_change_test.MessageFormatChangeTest.test_compatibility.consumer_version=0.9.0.1.producer_version=0.9.0.1
   status:     PASS
   run time:   3 minutes 47.147 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.message_format_change_test.MessageFormatChangeTest.test_compatibility.consumer_version=0.10.2.2.producer_version=0.10.2.2
   status:     PASS
   run time:   4 minutes 42.586 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all
   status:     PASS
   run time:   57.221 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.consumer_num=2.quota_type=client-id
   status:     PASS
   run time:   3 minutes 9.596 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable
   status:     PASS
   run time:   1 minute 7.739 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=all
   status:     PASS
   run time:   1 minute 4.560 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.compression_test.CompressionTest.test_compressed_topic.compression_types=.snappy.gzip.lz4.zstd.none
   status:     PASS
   run time:   2 minutes 7.014 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.message_format_change_test.MessageFormatChangeTest.test_compatibility.consumer_version=dev.producer_version=dev
   status:     PASS
   run time:   4 minutes 22.235 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable
   status:     FAIL
   run time:   1 minute 9.961 seconds
   
   
       
   Traceback (most recent call last):
     File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.7-py2.7.egg/ducktape/tests/runner_client.py", line 132, in run
       data = self.run_test()
     File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.7-py2.7.egg/ducktape/tests/runner_client.py", line 189, in run_test
       return self.test_context.function(self.test)
     File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.7-py2.7.egg/ducktape/mark/_mark.py", line 428, in wrapper
       return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
     File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", line 257, in test_fencing_static_consumer
       assert len(consumer.dead_nodes()) == num_conflict_consumers
   AssertionError
   
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=False.enable_autocommit=False
   status:     PASS
   run time:   1 minute 1.026 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=False.enable_autocommit=True
   status:     PASS
   run time:   1 minute 0.342 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test
   status:     PASS
   run time:   40.060 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=False
   status:     PASS
   run time:   55.844 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=True
   status:     PASS
   run time:   56.067 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.old_broker_throttling_behavior=True.quota_type=client-id
   status:     PASS
   run time:   3 minutes 13.626 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=all
   status:     PASS
   run time:   1 minute 25.122 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce
   status:     PASS
   run time:   2 minutes 49.878 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=True.bounce_mode=rolling
   status:     PASS
   run time:   2 minutes 15.369 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=False
   status:     PASS
   run time:   1 minute 0.818 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True
   status:     PASS
   run time:   1 minute 0.885 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_group_consumption
   status:     PASS
   run time:   1 minute 5.476 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.static_membership=True.clean_shutdown=True.bounce_mode=all.num_bounces=5
   status:     PASS
   run time:   1 minute 20.141 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.static_membership=False.clean_shutdown=True.bounce_mode=all.num_bounces=5
   status:     PASS
   run time:   1 minute 34.796 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.static_membership=False.clean_shutdown=True.bounce_mode=rolling.num_bounces=5
   status:     PASS
   run time:   1 minute 41.124 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.static_membership=True.clean_shutdown=True.bounce_mode=rolling.num_bounces=5
   status:     PASS
   run time:   1 minute 32.181 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.truncation_test.TruncationTest.test_offset_truncate
   status:     PASS
   run time:   1 minute 43.953 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=.user.client-id.override_quota=True
   status:     PASS
   run time:   3 minutes 18.792 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.old_client_throttling_behavior=True
   status:     PASS
   run time:   3 minutes 28.109 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=.user.client-id.override_quota=False
   status:     PASS
   run time:   4 minutes 15.238 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.override_quota=True
   status:     PASS
   run time:   3 minutes 20.559 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.override_quota=False
   status:     PASS
   run time:   4 minutes 16.169 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=user.override_quota=False
   status:     PASS
   run time:   4 minutes 10.209 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=user.override_quota=True
   status:     PASS
   run time:   3 minutes 13.171 seconds
   ----------------------------------------------------------------------------------------------------
   ```
   
   Re-ran the flaky test here: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-16--001.1592340267--mumrah--KAFKA-10123--e12d62f27/report.html
   
   ```
   ====================================================================================================
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.7.7
   session_id:       2020-06-16--001
   run time:         6 minutes 4.729 seconds
   tests run:        16
   passed:           16
   failed:           0
   ignored:          0
   ====================================================================================================
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all
   status:     PASS
   run time:   56.582 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=all
   status:     PASS
   run time:   1 minute 2.261 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable
   status:     PASS
   run time:   1 minute 4.217 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all
   status:     PASS
   run time:   55.329 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable
   status:     PASS
   run time:   1 minute 10.086 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable
   status:     PASS
   run time:   1 minute 10.305 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=all
   status:     PASS
   run time:   57.946 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable
   status:     PASS
   run time:   1 minute 6.535 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all
   status:     PASS
   run time:   1 minute 3.113 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable
   status:     PASS
   run time:   1 minute 1.881 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=all
   status:     PASS
   run time:   1 minute 2.480 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable
   status:     PASS
   run time:   1 minute 8.963 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=all
   status:     PASS
   run time:   59.126 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=1.fencing_stage=stable
   status:     PASS
   run time:   1 minute 4.819 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=all
   status:     PASS
   run time:   1 minute 3.293 seconds
   ----------------------------------------------------------------------------------------------------
   test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_fencing_static_consumer.num_conflict_consumers=2.fencing_stage=stable
   status:     PASS
   run time:   1 minute 7.971 seconds
   ----------------------------------------------------------------------------------------------------
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org