You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Debasish Ghosh <gh...@gmail.com> on 2017/07/14 06:49:36 UTC

State management & restore functionality

Hi -

I have a question which is mostly to clarify some conceptions regarding
state management and restore functionality using Kafka Streams ..

When I have multiple instances of the same application running (same
application id for each of the instances), are the following assumptions
correct ?

   1. each instance has a separate state store (local)
   2. all instances are backed up by a *single* changelog topic

Now the question arises, how does restore work in the above case when we
have 1 changelog topic backing up multiple state stores ?

Each instance of the application ingests data from specific partitions of
the topic. And there can be multiple topics too. e.g. if we have m topics
with n partitions in each, and p instances of the application, then all the
(m x n) partitions are distributed across the p instances of the
application. Is this true ?

If so, then does the changelog topic also has (m x n) partitions, so that
Kafka knows which state to restore in which store in case of a restore
operation ?

And finally, if we decide to merge topics / partitions in between without
complete reset of the application, will (a) it work ? and (b) the changelog
topic gets updated accordingly and (c) is this recommended ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: State management & restore functionality

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Streams does use one changelog topic per store (not just a single global
changelog topic per application). Thus, the number of partitions can be
different for different stores/changelog topics within one application.

About partitions assignment: It depends a little bit on the structure of
your program (ie, the DAG structure). But in general, partitions of
different topics are co-located. Assume you have 2 input topic T1 and T2
with 3 partitions each and 2 application instances: Instance 1 would get
T1-0 and T2-0 assigned and instance 2 would get T1-1 and T2-1 assigned.
The remaining partitions T1-2 and T2-2 might be on either instance (so
either both on instance 1 or both on instance 2).

For this case, the changelog topic would have 3 partitions (same as the
input topics).

About modifying the input topics: This is not allowed and will break
your application. After a changelog topic got created, it will not be
modified anymore. Thus, if you change the number of input topic
partitions, it does not match the number of changelog topic partitions
and Streams will raise an exception. Using the reset tool is mandatory
for this case to fix it.

This blog post gives more details:
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/


-Matthias


On 7/14/17 5:53 AM, Eno Thereska wrote:
> Hi Debasish,
> 
> Your intuition about the first part is correct. Kafka Streams automatically assigns a partition of a topic to 
> a task in an instance. It will never be the case that the same partition is assigned to two tasks.
> 
> About the merging or changing of partitions part, it would help if we know more about what you 
> are trying to do. For example, if behind the scenes you add or remove partitions that would not work
> well with Kafka Streams. However, if you use the Kafka Streams itself to create new topics (e.g., 
> by merging two topics into one, or vice versa by taking one topic and splitting it into more topics), then
> that would work fine.
> 
> Eno
> 
>> On 13 Jul 2017, at 23:49, Debasish Ghosh <gh...@gmail.com> wrote:
>>
>> Hi -
>>
>> I have a question which is mostly to clarify some conceptions regarding
>> state management and restore functionality using Kafka Streams ..
>>
>> When I have multiple instances of the same application running (same
>> application id for each of the instances), are the following assumptions
>> correct ?
>>
>>   1. each instance has a separate state store (local)
>>   2. all instances are backed up by a *single* changelog topic
>>
>> Now the question arises, how does restore work in the above case when we
>> have 1 changelog topic backing up multiple state stores ?
>>
>> Each instance of the application ingests data from specific partitions of
>> the topic. And there can be multiple topics too. e.g. if we have m topics
>> with n partitions in each, and p instances of the application, then all the
>> (m x n) partitions are distributed across the p instances of the
>> application. Is this true ?
>>
>> If so, then does the changelog topic also has (m x n) partitions, so that
>> Kafka knows which state to restore in which store in case of a restore
>> operation ?
>>
>> And finally, if we decide to merge topics / partitions in between without
>> complete reset of the application, will (a) it work ? and (b) the changelog
>> topic gets updated accordingly and (c) is this recommended ?
>>
>> regards.
>>
>> -- 
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
> 


Re: State management & restore functionality

Posted by Debasish Ghosh <gh...@gmail.com>.
Thanks Eno for the clarification. I did some more digging up and found that
there's a time interval which can be configured to set the compaction
interval. And for the topic compaction takes place for all segments except
the current one being written. These are all useful information. Thanks
again ..

regards.

On Fri, Jul 14, 2017 at 9:53 PM, Eno Thereska <en...@gmail.com>
wrote:

> None of these questions are naive, so no worries. Answer inline:
>
> > During restore why does Kafka replay the whole topic / partition to
> recreate the state in the local state store ? Isn't there any way to just
> have the latest message as the current state ? Because that's what it is ..
> right ? The last message in the topic / partition IS the latest state. May
> be I am missing something obvious ?
> >
>
> Let's say Kafka streams is doing an aggregate, e.g., computing sum(). For
> each key, it will compute the new sum() as new records arrive and store the
> result in the changelog topic in Kafka as well as it keeps a copy on
> RocksDB locally. Now, after a failure, a fresh instance comes along with no
> local state in RocksDB. It's necessary to re-construct that state. You are
> right that only the latest value for a key is needed. That is accomplished
> since the changelog topic is a compacted topic, and Kafka will do the
> compaction and keep only the latest value for a key. So what you are saying
> is effectively happening.
>
> Note that if state restoration ends up being too long for your application
> needs, consider using standby tasks http://docs.confluent.io/
> current/streams/developer-guide.html#streams-developer-
> guide-standby-replicas <http://docs.confluent.io/
> current/streams/developer-guide.html#streams-developer-
> guide-standby-replicas>.
>
> Hope this helps,
> Eno
>
>
> > regards.
> >
> > On Fri, Jul 14, 2017 at 6:23 PM, Eno Thereska <eno.thereska@gmail.com
> <ma...@gmail.com>> wrote:
> > Hi Debasish,
> >
> > Your intuition about the first part is correct. Kafka Streams
> automatically assigns a partition of a topic to
> > a task in an instance. It will never be the case that the same partition
> is assigned to two tasks.
> >
> > About the merging or changing of partitions part, it would help if we
> know more about what you
> > are trying to do. For example, if behind the scenes you add or remove
> partitions that would not work
> > well with Kafka Streams. However, if you use the Kafka Streams itself to
> create new topics (e.g.,
> > by merging two topics into one, or vice versa by taking one topic and
> splitting it into more topics), then
> > that would work fine.
> >
> > Eno
> >
> > > On 13 Jul 2017, at 23:49, Debasish Ghosh <ghosh.debasish@gmail.com
> <ma...@gmail.com>> wrote:
> > >
> > > Hi -
> > >
> > > I have a question which is mostly to clarify some conceptions regarding
> > > state management and restore functionality using Kafka Streams ..
> > >
> > > When I have multiple instances of the same application running (same
> > > application id for each of the instances), are the following
> assumptions
> > > correct ?
> > >
> > >   1. each instance has a separate state store (local)
> > >   2. all instances are backed up by a *single* changelog topic
> > >
> > > Now the question arises, how does restore work in the above case when
> we
> > > have 1 changelog topic backing up multiple state stores ?
> > >
> > > Each instance of the application ingests data from specific partitions
> of
> > > the topic. And there can be multiple topics too. e.g. if we have m
> topics
> > > with n partitions in each, and p instances of the application, then
> all the
> > > (m x n) partitions are distributed across the p instances of the
> > > application. Is this true ?
> > >
> > > If so, then does the changelog topic also has (m x n) partitions, so
> that
> > > Kafka knows which state to restore in which store in case of a restore
> > > operation ?
> > >
> > > And finally, if we decide to merge topics / partitions in between
> without
> > > complete reset of the application, will (a) it work ? and (b) the
> changelog
> > > topic gets updated accordingly and (c) is this recommended ?
> > >
> > > regards.
> > >
> > > --
> > > Debasish Ghosh
> > > http://manning.com/ghosh2 <http://manning.com/ghosh2>
> > > http://manning.com/ghosh <http://manning.com/ghosh>
> > >
> > > Twttr: @debasishg
> > > Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> > > Code: http://github.com/debasishg <http://github.com/debasishg>
> >
> >
> >
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2 <http://manning.com/ghosh2>
> > http://manning.com/ghosh <http://manning.com/ghosh>
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> > Code: http://github.com/debasishg <http://github.com/debasishg>
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: State management & restore functionality

Posted by Eno Thereska <en...@gmail.com>.
None of these questions are naive, so no worries. Answer inline:

> During restore why does Kafka replay the whole topic / partition to recreate the state in the local state store ? Isn't there any way to just have the latest message as the current state ? Because that's what it is .. right ? The last message in the topic / partition IS the latest state. May be I am missing something obvious ?
> 

Let's say Kafka streams is doing an aggregate, e.g., computing sum(). For each key, it will compute the new sum() as new records arrive and store the result in the changelog topic in Kafka as well as it keeps a copy on RocksDB locally. Now, after a failure, a fresh instance comes along with no local state in RocksDB. It's necessary to re-construct that state. You are right that only the latest value for a key is needed. That is accomplished since the changelog topic is a compacted topic, and Kafka will do the compaction and keep only the latest value for a key. So what you are saying is effectively happening.

Note that if state restoration ends up being too long for your application needs, consider using standby tasks http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-standby-replicas <http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-standby-replicas>.

Hope this helps,
Eno


> regards.
> 
> On Fri, Jul 14, 2017 at 6:23 PM, Eno Thereska <eno.thereska@gmail.com <ma...@gmail.com>> wrote:
> Hi Debasish,
> 
> Your intuition about the first part is correct. Kafka Streams automatically assigns a partition of a topic to
> a task in an instance. It will never be the case that the same partition is assigned to two tasks.
> 
> About the merging or changing of partitions part, it would help if we know more about what you
> are trying to do. For example, if behind the scenes you add or remove partitions that would not work
> well with Kafka Streams. However, if you use the Kafka Streams itself to create new topics (e.g.,
> by merging two topics into one, or vice versa by taking one topic and splitting it into more topics), then
> that would work fine.
> 
> Eno
> 
> > On 13 Jul 2017, at 23:49, Debasish Ghosh <ghosh.debasish@gmail.com <ma...@gmail.com>> wrote:
> >
> > Hi -
> >
> > I have a question which is mostly to clarify some conceptions regarding
> > state management and restore functionality using Kafka Streams ..
> >
> > When I have multiple instances of the same application running (same
> > application id for each of the instances), are the following assumptions
> > correct ?
> >
> >   1. each instance has a separate state store (local)
> >   2. all instances are backed up by a *single* changelog topic
> >
> > Now the question arises, how does restore work in the above case when we
> > have 1 changelog topic backing up multiple state stores ?
> >
> > Each instance of the application ingests data from specific partitions of
> > the topic. And there can be multiple topics too. e.g. if we have m topics
> > with n partitions in each, and p instances of the application, then all the
> > (m x n) partitions are distributed across the p instances of the
> > application. Is this true ?
> >
> > If so, then does the changelog topic also has (m x n) partitions, so that
> > Kafka knows which state to restore in which store in case of a restore
> > operation ?
> >
> > And finally, if we decide to merge topics / partitions in between without
> > complete reset of the application, will (a) it work ? and (b) the changelog
> > topic gets updated accordingly and (c) is this recommended ?
> >
> > regards.
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2 <http://manning.com/ghosh2>
> > http://manning.com/ghosh <http://manning.com/ghosh>
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> > Code: http://github.com/debasishg <http://github.com/debasishg>
> 
> 
> 
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 <http://manning.com/ghosh2>
> http://manning.com/ghosh <http://manning.com/ghosh>
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> Code: http://github.com/debasishg <http://github.com/debasishg>

Re: State management & restore functionality

Posted by Debasish Ghosh <gh...@gmail.com>.
Thanks Eno ..

regarding the merging part, I was talking about merging topics using
streams only - so that is safe as you mentioned.

Regarding the restore part, I have another question. May be it's a bit
naive too ..

During restore why does Kafka replay the whole topic / partition to
recreate the state in the local state store ? Isn't there any way to just
have the latest message as the current state ? Because that's what it is ..
right ? The last message in the topic / partition IS the latest state. May
be I am missing something obvious ?

regards.

On Fri, Jul 14, 2017 at 6:23 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Debasish,
>
> Your intuition about the first part is correct. Kafka Streams
> automatically assigns a partition of a topic to
> a task in an instance. It will never be the case that the same partition
> is assigned to two tasks.
>
> About the merging or changing of partitions part, it would help if we know
> more about what you
> are trying to do. For example, if behind the scenes you add or remove
> partitions that would not work
> well with Kafka Streams. However, if you use the Kafka Streams itself to
> create new topics (e.g.,
> by merging two topics into one, or vice versa by taking one topic and
> splitting it into more topics), then
> that would work fine.
>
> Eno
>
> > On 13 Jul 2017, at 23:49, Debasish Ghosh <gh...@gmail.com>
> wrote:
> >
> > Hi -
> >
> > I have a question which is mostly to clarify some conceptions regarding
> > state management and restore functionality using Kafka Streams ..
> >
> > When I have multiple instances of the same application running (same
> > application id for each of the instances), are the following assumptions
> > correct ?
> >
> >   1. each instance has a separate state store (local)
> >   2. all instances are backed up by a *single* changelog topic
> >
> > Now the question arises, how does restore work in the above case when we
> > have 1 changelog topic backing up multiple state stores ?
> >
> > Each instance of the application ingests data from specific partitions of
> > the topic. And there can be multiple topics too. e.g. if we have m topics
> > with n partitions in each, and p instances of the application, then all
> the
> > (m x n) partitions are distributed across the p instances of the
> > application. Is this true ?
> >
> > If so, then does the changelog topic also has (m x n) partitions, so that
> > Kafka knows which state to restore in which store in case of a restore
> > operation ?
> >
> > And finally, if we decide to merge topics / partitions in between without
> > complete reset of the application, will (a) it work ? and (b) the
> changelog
> > topic gets updated accordingly and (c) is this recommended ?
> >
> > regards.
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re: State management & restore functionality

Posted by Eno Thereska <en...@gmail.com>.
Hi Debasish,

Your intuition about the first part is correct. Kafka Streams automatically assigns a partition of a topic to 
a task in an instance. It will never be the case that the same partition is assigned to two tasks.

About the merging or changing of partitions part, it would help if we know more about what you 
are trying to do. For example, if behind the scenes you add or remove partitions that would not work
well with Kafka Streams. However, if you use the Kafka Streams itself to create new topics (e.g., 
by merging two topics into one, or vice versa by taking one topic and splitting it into more topics), then
that would work fine.

Eno

> On 13 Jul 2017, at 23:49, Debasish Ghosh <gh...@gmail.com> wrote:
> 
> Hi -
> 
> I have a question which is mostly to clarify some conceptions regarding
> state management and restore functionality using Kafka Streams ..
> 
> When I have multiple instances of the same application running (same
> application id for each of the instances), are the following assumptions
> correct ?
> 
>   1. each instance has a separate state store (local)
>   2. all instances are backed up by a *single* changelog topic
> 
> Now the question arises, how does restore work in the above case when we
> have 1 changelog topic backing up multiple state stores ?
> 
> Each instance of the application ingests data from specific partitions of
> the topic. And there can be multiple topics too. e.g. if we have m topics
> with n partitions in each, and p instances of the application, then all the
> (m x n) partitions are distributed across the p instances of the
> application. Is this true ?
> 
> If so, then does the changelog topic also has (m x n) partitions, so that
> Kafka knows which state to restore in which store in case of a restore
> operation ?
> 
> And finally, if we decide to merge topics / partitions in between without
> complete reset of the application, will (a) it work ? and (b) the changelog
> topic gets updated accordingly and (c) is this recommended ?
> 
> regards.
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg