You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/06/09 20:52:00 UTC

[jira] [Commented] (KAFKA-10123) Regression resetting offsets in consumer when fetching from old broker

    [ https://issues.apache.org/jira/browse/KAFKA-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129791#comment-17129791 ] 

John Roesler commented on KAFKA-10123:
--------------------------------------

I've just also added this as a blocker for 2.5.1, because the commit for KAFKA-9724 is also included in 2.5.1, which alone is enough reason to include the fix in 2.5.1.

We also think it might have been the root cause of the Streams EOS soak cluster losing all its threads over the course of a week.

I'm including this information in case it helps...

Here's the stack trace:
{code:java}
[2020-06-03 02:19:01,503] ERROR [stream-soak-test-d8d40c42-1175-4611-994a-ff3fde11cef2-StreamThread-1] stream-thread [stream-soak-test-d8d40c42-1175-4611-994a-ff3fde11cef2-StreamThread-1] Error caught during partition revocation, will abort the current process and re-throw at the end of rebalance:  (org.apache.kafka.streams.processor.internals.StreamThread)

org.apache.kafka.streams.errors.StreamsException: stream-thread [stream-soak-test-d8d40c42-1175-4611-994a-ff3fde11cef2-StreamThread-1] failed to suspend stream tasks
        at org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253)
        at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:297)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:393)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:745)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [logs.json.zookeeper-0, logs.operator-0, logs.kubernetes-0, logs.json.kafka-0]
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:512)
        at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:632)
        at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:601)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:146)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:129)
        at org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246)
        ... 13 more
Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [logs.json.zookeeper-0, logs.operator-0, logs.kubernetes-0, logs.json.kafka-0]
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:658)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2392)
        at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1764)
        at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1723)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:503)
        ... 18 more{code}
Not the same error, but it seems related-ish.

Note:

Streams/Client version and commit: 2.5.x {color:#e01e5a}5842d2f3{color}

Broker version and commit: kafka_2.12-2.5.x bad93b775

(the previous soak test with the clients on commit {color:#e01e5a}b59f880f{color} did not have this error)

> Regression resetting offsets in consumer when fetching from old broker
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-10123
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10123
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: David Arthur
>            Priority: Blocker
>             Fix For: 2.6.0, 2.5.1
>
>
> We saw this error in system tests:
> {code}
> java.lang.NullPointerException
>         at org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}
> The logs showed that the consumer was in the middle of an offset reset when this happened. We changed the validation logic in KAFKA-9724 to include the following check with the intent of skipping validation for old brokers:
> {code}
>             NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString());
>             if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
>                 return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
>             } else {
>                 // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation
>                 completeValidation(tp);
>                 return false;
>             }
> {code}
> The problem seems to be the shortcut call to `completeValidation`, which executes the following logic:
> {code}
>             if (hasPosition()) {
>                 transitionState(FetchStates.FETCHING, () -> this.nextRetryTimeMs = null);
>             }
> {code}
> We should be protected by the call to `hasPosition` here, but in the case of the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to enter the `FETCHING` state without a position, which ultimately leads to the NPE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)