You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Eno Thereska (JIRA)" <ji...@apache.org> on 2017/06/27 13:46:00 UTC

[jira] [Commented] (KAFKA-4593) Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException

    [ https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064855#comment-16064855 ] 

Eno Thereska commented on KAFKA-4593:
-------------------------------------

This doesn't seem to be a problem in the sense that if it happens, correctness is maintained since this thread will die and another existing one will take over. The question is whether we want this thread to continue running, perhaps after it gives up all tasks. What is the ideal outcome here? I'd recommend restarting the thread with no tasks and letting rebalancing take care of things again.

> Task migration during rebalance callback process could lead the obsoleted task's IllegalStateException
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4593
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4593
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: infrastructure
>
> 1. Assume 2 running threads A and B, and one task t1 just for simplicity. Thread A and B are on different machines so their local state dir are not shared.
> 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned task).
> 3. During the first rebalance callback, task t1's state store need to be restored on thread A, and this is called in "restoreActiveState" of "createStreamTask".
> 4. Now suppose thread A has a long GC causing it to stall, a second rebalance then will be triggered and kicked A out of the group; B gets the task t1 and did the same restoration process, after the process thread B continues to process data and update the state store, while at the same time writes more messages to the changelog (so its log end offset has incremented).
> 5. After a while A resumes from the long GC, not knowing it has actually be kicked out of the group and task t1 is no longer owned to itself, it continues the restoration process but then realize that the log end offset has advanced. When this happens, we will see the following exception on thread A:
> {code}
> java.lang.IllegalStateException: task XXX Log end offset of
> YYY-table_stream-changelog-ZZ should not change while
> restoring: old end offset .., current offset ..
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>         at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)
>         at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)
>         at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)
>         at
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)
>         at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)