You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (Jira)" <ji...@apache.org> on 2020/06/18 05:34:00 UTC
[jira] [Resolved] (KAFKA-10123) Regression resetting offsets in
consumer when fetching from old broker
[ https://issues.apache.org/jira/browse/KAFKA-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-10123.
-------------------------------------
Resolution: Fixed
> Regression resetting offsets in consumer when fetching from old broker
> ----------------------------------------------------------------------
>
> Key: KAFKA-10123
> URL: https://issues.apache.org/jira/browse/KAFKA-10123
> Project: Kafka
> Issue Type: Bug
> Reporter: Jason Gustafson
> Assignee: David Arthur
> Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We saw this error in system tests:
> {code}
> java.lang.NullPointerException
> at org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111)
> at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
> at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}
> The logs showed that the consumer was in the middle of an offset reset when this happened. We changed the validation logic in KAFKA-9724 to include the following check with the intent of skipping validation for old brokers:
> {code}
> NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString());
> if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
> return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
> } else {
> // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation
> completeValidation(tp);
> return false;
> }
> {code}
> The problem seems to be the shortcut call to `completeValidation`, which executes the following logic:
> {code}
> if (hasPosition()) {
> transitionState(FetchStates.FETCHING, () -> this.nextRetryTimeMs = null);
> }
> {code}
> We should be protected by the call to `hasPosition` here, but in the case of the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to enter the `FETCHING` state without a position, which ultimately leads to the NPE.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)