You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2017/01/01 19:25:49 UTC

0.10.2.0-SNAPSHOT - "log end offset should not change while restoring"

java.lang.IllegalStateException: task [0_6] Log end offset of
RtDetailBreakoutProcessor-table_stream-changelog-6 should not change while
restoring: old end offset 26883455, current offset 2

6883467

        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)

        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

        at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)

        at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)

        at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)

        at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)

        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)

        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)

        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)

        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)

        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)

        at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)

        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)

        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)

        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)

Re: 0.10.2.0-SNAPSHOT - "log end offset should not change while restoring"

Posted by Guozhang Wang <wa...@gmail.com>.
Jon,

I looked through the code and found one possible explanation of your
observed exception:

1. Assume 2 running threads A and B, and one task t1 jut for simplicity.
2. First rebalance is triggered, task t1 is assigned to A (B has no
assigned task).
3. During the first rebalance callback, task t1's state store need to be
restored on thread A, and this is called in "restoreActiveState" of
"createStreamTask".
4. Not suppose thread A has a long GC causing it to stall, a second
rebalance then will be triggered and kicked A out of the group; B gets the
task t1 and did the same restoration process, after the process thread B
continues to process data and update the state store, while at the same
time writes more messages to the changelog (so its log end offset has
incremented).

5. After a while A resumes from the long GC, not knowing it has actually be
kicked out of the group and task t1 is no longer owned to itself, it
continues the restoration process but then realize that the log end offset
has advanced.


To validate if this is indeed the case, you can check your logs and see:

1. if there are at least two consecutive rebalances triggered (this can be
found from the server-side coordinator's log).
2. if the exception-thrown thread which owns the task [0_6] was stalled
while restoring.
3. if task [0_6] gets migrated to another thread in a second rebalance.


At the mean time, I will file a JIRA to keep track of this issue and if it
is observed commonly we can try to fix it asap.

Guozhang


On Sun, Jan 1, 2017 at 11:25 AM, Jon Yeargers <jo...@cedexis.com>
wrote:

> java.lang.IllegalStateException: task [0_6] Log end offset of
> RtDetailBreakoutProcessor-table_stream-changelog-6 should not change while
> restoring: old end offset 26883455, current offset 2
>
> 6883467
>
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:248)
>
>         at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:201)
>
>         at
> org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.register(ProcessorContextImpl.java:122)
>
>         at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:200)
>
>         at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> MeteredWindowStore.java:65)
>
>         at
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:65)
>
>         at
> org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.
> java:120)
>
>         at
> org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:794)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1222)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(
> StreamThread.java:897)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> StreamThread.java:71)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:240)
>
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:230)
>
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:314)
>
>         at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:278)
>
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:261)
>
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1039)
>
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:1004)
>
>         at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:570)
>
>         at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:359)
>



-- 
-- Guozhang