You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Neeraj Vaidya <ne...@yahoo.co.in.INVALID> on 2022/06/01 04:09:36 UTC

KStreams State Store - state.dir does not have .checkpoint file

Hi All,
I have a KStreams application running inside a Docker container which uses a persistent key-value store. 

I have configured state.dir with a value of /tmp/kafka-streams (which is the default).

When I start this container using "docker run", I mount /tmp/kafka-streams to a directory on my host machine which is, say for example, /mnt/storage/kafka-streams.

My application.id is "myapp". I have 288 partitions in my input topic which means my state store / changelog topic will also have that many partitions. Accordingly, when start my Docker container, I see that there a folder with the number of the partition such as 0_1, 0_2....0_288 under /mnt/storage/kafka-streams/myapp/

When I shutdown my application, I do not see any checkpoint file in any of the partition directories.

And when I restart my application, it starts fetching the records from the changelog topic rather than reading from local disk. I suspect this is because there is no .checkpoint file in any of the partition directories. 

This is what I see in the startup log. It seems to be bootstrapping the entire state store from the changelog topic i.e. performing network I/O rather than reading from what is on disk :

"
2022-05-31T12:08:02.791 [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitte
d data in stores we have to treat it as a task corruption error and wipe out the local state of task 0_170 before re-bootstrapping
2022-05-31T12:08:02.791 [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
9e5093-StreamThread-122] Detected the states of tasks [0_170] are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] are corrupted and hence needs to be re-initialized
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
        at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
        at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
        at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
        at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
"

1) Should I expect to see a checkpoint file in each of the partition directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my application ?

2) Is this an issue because I am running my KStreams app inside a docker container ? If there were permissions issues, then I would have expected to see issues in creating the other files such as .lock or rocksdb folder (and it's contents).

My runtime environment is Docker 1.13.1 on RHEL 7.

Re: KStreams State Store - state.dir does not have .checkpoint file

Posted by Steven Schlansker <st...@gmail.com>.
> On Jun 1, 2022, at 5:27 AM, Neeraj Vaidya <ne...@yahoo.co.in.INVALID> wrote:
> 
> Thanks John !
> It seems if I send a TERM signal to my KStreams application which is running inside a Docker container, then it results in a Clean shutdown.
> This also then creates a checkpoint file successfully.
> So, I guess I need to figure out how to send a TERM signal to my running Java KStreams application inside the Docker container.
> The KStreams application is actually launched by an entrypoint.sh script in the Docker container. If I send a signal to this container using "docker kill", this signal does not get passed to the java application which is spawned by the entrypoint.sh script.

Launching directly from a shell script in a container doesn't work very well - you need a proper init substitute for your container to handle
signal management and subprocess zombie reaping.

We use dumb-init:
https://github.com/Yelp/dumb-init

dumb-init calls into our launcher script, which then `exec`s the java process.


> 
> Regards,
> Neeraj
>     On Wednesday, 1 June, 2022, 04:38:16 pm GMT+10, John Roesler <vv...@apache.org> wrote:  
> 
> Hi Neeraj,
> 
> Thanks for all that detail! Your expectation is correct. You should see the checkpoint files after a _clean_ shutdown, and then you should not see it bootstrap from the beginning of the changelog on the next startup.
> 
> How are you shutting down the application? You'll want to call KafkaStreams#stop and wait for it to complete before stopping the java process.
> 
> I hope this helps,
> -John
> 
> On Tue, May 31, 2022, at 23:09, Neeraj Vaidya wrote:
>> Hi All,
>> I have a KStreams application running inside a Docker container which 
>> uses a persistent key-value store. 
>> 
>> I have configured state.dir with a value of /tmp/kafka-streams (which 
>> is the default).
>> 
>> When I start this container using "docker run", I mount 
>> /tmp/kafka-streams to a directory on my host machine which is, say for 
>> example, /mnt/storage/kafka-streams.
>> 
>> My application.id is "myapp". I have 288 partitions in my input topic 
>> which means my state store / changelog topic will also have that many 
>> partitions. Accordingly, when start my Docker container, I see that 
>> there a folder with the number of the partition such as 0_1, 
>> 0_2....0_288 under /mnt/storage/kafka-streams/myapp/
>> 
>> When I shutdown my application, I do not see any checkpoint file in any 
>> of the partition directories.
>> 
>> And when I restart my application, it starts fetching the records from 
>> the changelog topic rather than reading from local disk. I suspect this 
>> is because there is no .checkpoint file in any of the partition 
>> directories. 
>> 
>> This is what I see in the startup log. It seems to be bootstrapping the 
>> entire state store from the changelog topic i.e. performing network I/O 
>> rather than reading from what is on disk :
>> 
>> "
>> 2022-05-31T12:08:02.791 
>> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
>> o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread 
>> [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
>> d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did 
>> not find checkpoint offsets while stores are not empty, since under EOS 
>> it has the risk of getting uncommitte
>> d data in stores we have to treat it as a task corruption error and 
>> wipe out the local state of task 0_170 before re-bootstrapping
>> 2022-05-31T12:08:02.791 
>> [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
>> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
>> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
>> 9e5093-StreamThread-122] Detected the states of tasks [0_170] are 
>> corrupted. Will close the task as dirty and re-create and bootstrap 
>> from scratch.
>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] 
>> are corrupted and hence needs to be re-initialized
>>         at 
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
>>         at 
>> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
>>         at 
>> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
>> "
>> 
>> 1) Should I expect to see a checkpoint file in each of the partition 
>> directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my 
>> application ?
>> 
>> 2) Is this an issue because I am running my KStreams app inside a 
>> docker container ? If there were permissions issues, then I would have 
>> expected to see issues in creating the other files such as .lock or 
>> rocksdb folder (and it's contents).
>> 
>> My runtime environment is Docker 1.13.1 on RHEL 7.


Re: KStreams State Store - state.dir does not have .checkpoint file

Posted by Neeraj Vaidya <ne...@yahoo.co.in.INVALID>.
 Thanks John !
It seems if I send a TERM signal to my KStreams application which is running inside a Docker container, then it results in a Clean shutdown.
This also then creates a checkpoint file successfully.
So, I guess I need to figure out how to send a TERM signal to my running Java KStreams application inside the Docker container.
The KStreams application is actually launched by an entrypoint.sh script in the Docker container. If I send a signal to this container using "docker kill", this signal does not get passed to the java application which is spawned by the entrypoint.sh script.

Regards,
Neeraj
     On Wednesday, 1 June, 2022, 04:38:16 pm GMT+10, John Roesler <vv...@apache.org> wrote:  
 
 Hi Neeraj,

Thanks for all that detail! Your expectation is correct. You should see the checkpoint files after a _clean_ shutdown, and then you should not see it bootstrap from the beginning of the changelog on the next startup.

How are you shutting down the application? You'll want to call KafkaStreams#stop and wait for it to complete before stopping the java process.

I hope this helps,
-John

On Tue, May 31, 2022, at 23:09, Neeraj Vaidya wrote:
> Hi All,
> I have a KStreams application running inside a Docker container which 
> uses a persistent key-value store. 
>
> I have configured state.dir with a value of /tmp/kafka-streams (which 
> is the default).
>
> When I start this container using "docker run", I mount 
> /tmp/kafka-streams to a directory on my host machine which is, say for 
> example, /mnt/storage/kafka-streams.
>
> My application.id is "myapp". I have 288 partitions in my input topic 
> which means my state store / changelog topic will also have that many 
> partitions. Accordingly, when start my Docker container, I see that 
> there a folder with the number of the partition such as 0_1, 
> 0_2....0_288 under /mnt/storage/kafka-streams/myapp/
>
> When I shutdown my application, I do not see any checkpoint file in any 
> of the partition directories.
>
> And when I restart my application, it starts fetching the records from 
> the changelog topic rather than reading from local disk. I suspect this 
> is because there is no .checkpoint file in any of the partition 
> directories. 
>
> This is what I see in the startup log. It seems to be bootstrapping the 
> entire state store from the changelog topic i.e. performing network I/O 
> rather than reading from what is on disk :
>
> "
> 2022-05-31T12:08:02.791 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
> d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did 
> not find checkpoint offsets while stores are not empty, since under EOS 
> it has the risk of getting uncommitte
> d data in stores we have to treat it as a task corruption error and 
> wipe out the local state of task 0_170 before re-bootstrapping
> 2022-05-31T12:08:02.791 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
> 9e5093-StreamThread-122] Detected the states of tasks [0_170] are 
> corrupted. Will close the task as dirty and re-create and bootstrap 
> from scratch.
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] 
> are corrupted and hence needs to be re-initialized
>        at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
>        at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
>        at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
>        at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> "
>
> 1) Should I expect to see a checkpoint file in each of the partition 
> directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my 
> application ?
>
> 2) Is this an issue because I am running my KStreams app inside a 
> docker container ? If there were permissions issues, then I would have 
> expected to see issues in creating the other files such as .lock or 
> rocksdb folder (and it's contents).
>
> My runtime environment is Docker 1.13.1 on RHEL 7.
  

Re: KStreams State Store - state.dir does not have .checkpoint file

Posted by John Roesler <vv...@apache.org>.
Hi Neeraj,

Thanks for all that detail! Your expectation is correct. You should see the checkpoint files after a _clean_ shutdown, and then you should not see it bootstrap from the beginning of the changelog on the next startup.

How are you shutting down the application? You'll want to call KafkaStreams#stop and wait for it to complete before stopping the java process.

I hope this helps,
-John

On Tue, May 31, 2022, at 23:09, Neeraj Vaidya wrote:
> Hi All,
> I have a KStreams application running inside a Docker container which 
> uses a persistent key-value store. 
>
> I have configured state.dir with a value of /tmp/kafka-streams (which 
> is the default).
>
> When I start this container using "docker run", I mount 
> /tmp/kafka-streams to a directory on my host machine which is, say for 
> example, /mnt/storage/kafka-streams.
>
> My application.id is "myapp". I have 288 partitions in my input topic 
> which means my state store / changelog topic will also have that many 
> partitions. Accordingly, when start my Docker container, I see that 
> there a folder with the number of the partition such as 0_1, 
> 0_2....0_288 under /mnt/storage/kafka-streams/myapp/
>
> When I shutdown my application, I do not see any checkpoint file in any 
> of the partition directories.
>
> And when I restart my application, it starts fetching the records from 
> the changelog topic rather than reading from local disk. I suspect this 
> is because there is no .checkpoint file in any of the partition 
> directories. 
>
> This is what I see in the startup log. It seems to be bootstrapping the 
> entire state store from the changelog topic i.e. performing network I/O 
> rather than reading from what is on disk :
>
> "
> 2022-05-31T12:08:02.791 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
> d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did 
> not find checkpoint offsets while stores are not empty, since under EOS 
> it has the risk of getting uncommitte
> d data in stores we have to treat it as a task corruption error and 
> wipe out the local state of task 0_170 before re-bootstrapping
> 2022-05-31T12:08:02.791 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
> 9e5093-StreamThread-122] Detected the states of tasks [0_170] are 
> corrupted. Will close the task as dirty and re-create and bootstrap 
> from scratch.
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] 
> are corrupted and hence needs to be re-initialized
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> "
>
> 1) Should I expect to see a checkpoint file in each of the partition 
> directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my 
> application ?
>
> 2) Is this an issue because I am running my KStreams app inside a 
> docker container ? If there were permissions issues, then I would have 
> expected to see issues in creating the other files such as .lock or 
> rocksdb folder (and it's contents).
>
> My runtime environment is Docker 1.13.1 on RHEL 7.