You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2017/04/01 04:32:16 UTC

Re: Difference between with and without state store cleanup streams startup

Hi,
Ok so basically what I understand is that there are no global offset
maintained from changelog topic at broker level.
Every local state store maintains the offset under a local checkpoint file.

And in order to make sure state store rebuilds or builds its state by
reading from changelog topic faster, we need to ensure that change log
topics are compacted efficiently.

I hope these assumptions are correct.

Thanks
Sachin


On Sat, Apr 1, 2017 at 4:51 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> 1. The whole log will be read.
>
> 2. It will read all the key-value pairs. However, the store will contain
> only the latest record for each key, after state recovery finished.
>
> Both both (1) and (2): note, that changelog topics are compacted, thus,
> it will not read everything since you started your app, as log
> compaction will remove old records that got "overwritten" with newer
> records.
>
> 3. The instance will hit the timeout and you will get an exception.
>
> 4. and 5. It depends. On a clean shutdown, Streams writes a checkpoint
> file and thus knows the state of the store. Therefore, it does not need
> to read the changelog to recover the store. On unclean shutdown (like
> kill -9) the checkpoint file will be missing, and thus Streams will wipe
> out the state and recreate it from scratch (thus, you don't need to call
> .cleanup() manually)
>
> 6.
> https://github.com/apache/kafka/blob/trunk/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> StoreChangelogReader.java#L99
>
> called here:
> https://github.com/apache/kafka/blob/trunk/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> StreamThread.java#L1294
>
>
> -Matthias
>
> On 3/31/17 11:46 AM, Sachin Mittal wrote:
> > Hi,
> > There are two ways to re start a streams application
> > 1. executing streams.cleanUp() before streams.start()
> > This cleans up the local state store.
> >
> > 2. Just by calling streams.start()
> >
> > What are the differences between two.
> >
> > As I understand in first case it will try to create local state store by
> > replaying the changelog topic.
> >
> > So questions here are
> > 1. Will it try to replay the whole log from earliest or from last
> committed
> > offset?
> > 2. Will it read and fetch all the values from the topic for a given key
> or
> > only the last value for a key when creating a state store for that change
> > log topic?
> > 3. What happens if time to create the state store is greater than
> > max.poll.interval.ms
> >
> > 4. If we don't delete the state store then what happens. Does it again
> try
> > to recreate the same store by reading entire change log topic? Or if it
> > determines somehow from the state store what the latest offset is and
> > updates the state store with values from that offset onwards.
> >
> > 5. In case of unclean shutdown say by kill -9 is it advisable to cleanup
> > local state store before restart or we can just stat it normally.
> >
> > 6. Finally can anyone point me to the code where it creates the state
> store
> > by reading from changelog topic.
> >
> > Thanks
> > Sachin
> >
>
>

Re: Difference between with and without state store cleanup streams startup

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Yes. That's correct.

-Matthias

On 3/31/17 9:32 PM, Sachin Mittal wrote:
> Hi,
> Ok so basically what I understand is that there are no global offset
> maintained from changelog topic at broker level.
> Every local state store maintains the offset under a local checkpoint file.
> 
> And in order to make sure state store rebuilds or builds its state by
> reading from changelog topic faster, we need to ensure that change log
> topics are compacted efficiently.
> 
> I hope these assumptions are correct.
> 
> Thanks
> Sachin
> 
> 
> On Sat, Apr 1, 2017 at 4:51 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> 1. The whole log will be read.
>>
>> 2. It will read all the key-value pairs. However, the store will contain
>> only the latest record for each key, after state recovery finished.
>>
>> Both both (1) and (2): note, that changelog topics are compacted, thus,
>> it will not read everything since you started your app, as log
>> compaction will remove old records that got "overwritten" with newer
>> records.
>>
>> 3. The instance will hit the timeout and you will get an exception.
>>
>> 4. and 5. It depends. On a clean shutdown, Streams writes a checkpoint
>> file and thus knows the state of the store. Therefore, it does not need
>> to read the changelog to recover the store. On unclean shutdown (like
>> kill -9) the checkpoint file will be missing, and thus Streams will wipe
>> out the state and recreate it from scratch (thus, you don't need to call
>> .cleanup() manually)
>>
>> 6.
>> https://github.com/apache/kafka/blob/trunk/streams/src/
>> main/java/org/apache/kafka/streams/processor/internals/
>> StoreChangelogReader.java#L99
>>
>> called here:
>> https://github.com/apache/kafka/blob/trunk/streams/src/
>> main/java/org/apache/kafka/streams/processor/internals/
>> StreamThread.java#L1294
>>
>>
>> -Matthias
>>
>> On 3/31/17 11:46 AM, Sachin Mittal wrote:
>>> Hi,
>>> There are two ways to re start a streams application
>>> 1. executing streams.cleanUp() before streams.start()
>>> This cleans up the local state store.
>>>
>>> 2. Just by calling streams.start()
>>>
>>> What are the differences between two.
>>>
>>> As I understand in first case it will try to create local state store by
>>> replaying the changelog topic.
>>>
>>> So questions here are
>>> 1. Will it try to replay the whole log from earliest or from last
>> committed
>>> offset?
>>> 2. Will it read and fetch all the values from the topic for a given key
>> or
>>> only the last value for a key when creating a state store for that change
>>> log topic?
>>> 3. What happens if time to create the state store is greater than
>>> max.poll.interval.ms
>>>
>>> 4. If we don't delete the state store then what happens. Does it again
>> try
>>> to recreate the same store by reading entire change log topic? Or if it
>>> determines somehow from the state store what the latest offset is and
>>> updates the state store with values from that offset onwards.
>>>
>>> 5. In case of unclean shutdown say by kill -9 is it advisable to cleanup
>>> local state store before restart or we can just stat it normally.
>>>
>>> 6. Finally can anyone point me to the code where it creates the state
>> store
>>> by reading from changelog topic.
>>>
>>> Thanks
>>> Sachin
>>>
>>
>>
>