You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ning Zhang <ni...@gmail.com> on 2021/01/01 23:26:50 UTC

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

The physical store behind "state store" is change-log kafka topic. In Kafka stream, if something fails in the middle, the "state store" is restored back to the state before the event happens at the first step / beginning of the stream.

 

On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote: 
> Hi All,
> 
> We use Kafka streams and may need to use exactly-once configuration for
> some of the use cases. Currently, the application uses either local or
> global state store to store state.
>  So, the application will consume events from source kafka topic, process
> the events, for state stores it will use either local or global state store
> of kafka, then produce events onto the destination topic.
> 
> Question i have is: in the case of exactly-once setting, kafka streams
> guarantees that all actions happen or nothing happens. So, in this case,
> any state stored on the local or global state store will also be counted
> under 'all or nothing' guarantee e.g. if event is consumed and state store
> is updated, however some issue occurs before event is produced on
> destination topic then will state store be restored back to the state
> before it was updated for this event?
> 

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by "Matthias J. Sax" <mj...@apache.org>.
Well, as the ticket suggests, you cannot use a remote store with EOS.
There is no workaround. We would need to address the ticket you linked
to allow using remote stores.

Btw: this ticket might actually be more relevant for supporting remove
stores: https://issues.apache.org/jira/browse/KAFKA-12549

-Matthias

On 3/19/21 9:31 AM, Pushkar Deole wrote:
> Matthias,
> 
> With reference to your response above, i came across the JIRA ticket
> https://issues.apache.org/jira/browse/KAFKA-12475
> 
> For rocksDB or in-memory state stores, these are always backed by changelog
> topic, so they can be rebuilt from scratch from the changelog topic.
> However, how a remote state store can be made consistent in case of error
> e.g. stream consumed event from source topic, processed and stored state to
> redis, and before producing event to destination topic application dies. In
> this case, offset won't be committed to source topic and destination topics
> anyway doesn't have the processed event, however redis holds the new state.
> How can redis be wiped off the state that was saved while processing above
> event(s) ?
> 
> On Wed, Jan 6, 2021 at 11:18 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Well, that is exactly what I mean by "it depends on the state store
>> implementation".
>>
>> For this case, you cannot get exactly-once.
>>
>> There are actually ideas to improve the implementation to support the
>> case you describe, but there is no timeline for this change yet. Not
>> even sure if there is already a Jira ticket...
>>
>>
>> -Matthias
>>
>> On 1/6/21 2:32 AM, Pushkar Deole wrote:
>>> The question is if we want to use state store of 3rd party, e.g. say
>> Redis,
>>> how can the store be consistent with rest of the system i.e. source and
>>> destination topics...
>>>
>>> e.g. record is consumed from source, processed, state store updated with
>>> some state, but before writing to destination there is failure
>>> Now, in this case, with kafka state store, it will be wiped off the state
>>> stored since the transaction failed.
>>>
>>> But with Redis, the state store is updated with the new state and there
>> is
>>> no way to revert back
>>>
>>> On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> It depends on the store implementation. Atm, EOS for state store is
>>>> achieved by re-creating the state store in case of failure from the
>>>> changelog topic.
>>>>
>>>> For RocksDB stores, we wipe out the local state directories and create a
>>>> new empty RocksDB and for in-memory stores the content is "lost" anyway
>>>> when state is migrated, and we reply the changelog into an empty store
>>>> before processing resumes.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 1/5/21 6:27 AM, Alex Craig wrote:
>>>>> I don't think he's asking about data-loss, but rather data consistency.
>>>>> (in the event of an exception or application crash, will EOS ensure
>> that
>>>>> the state store data is consistent)  My understanding is that it DOES
>>>> apply
>>>>> to state stores as well, in the sense that a failure during processing
>>>>> would mean that the commit wouldn't get flushed and therefore wouldn't
>>>> get
>>>>> double-counted once processing resumes and message is re-processed.
>>>>> As far as using something other than RocksDB, I think as long as you
>> are
>>>>> implementing the state store API correctly you should be fine.  I did a
>>>> POC
>>>>> recently using Mongo state-stores with EOS enabled and it worked
>>>> correctly,
>>>>> even when I intentionally introduced failures and crashes.
>>>>>
>>>>> -alex
>>>>>
>>>>> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> If there is a "change-log" topic to back up the state store, then it
>> may
>>>>>> not lose data.
>>>>>>
>>>>>> Also, if the third party store is not "kafka community certified" (or
>>>> not
>>>>>> well-maintained), it may have chances to lose data (in different
>> ways).
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
>>>>>>> In case we opt to choose some third party store instead of kafka's
>>>> stores
>>>>>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
>>>>>>> exactly-once guarantee provided by kafka and the state stores can be
>> in
>>>>>> an
>>>>>>> inconsistent state ?
>>>>>>>
>>>>>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> The physical store behind "state store" is change-log kafka topic.
>> In
>>>>>>>> Kafka stream, if something fails in the middle, the "state store" is
>>>>>>>> restored back to the state before the event happens at the first
>> step
>>>> /
>>>>>>>> beginning of the stream.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> We use Kafka streams and may need to use exactly-once configuration
>>>>>> for
>>>>>>>>> some of the use cases. Currently, the application uses either local
>>>>>> or
>>>>>>>>> global state store to store state.
>>>>>>>>>  So, the application will consume events from source kafka topic,
>>>>>> process
>>>>>>>>> the events, for state stores it will use either local or global
>> state
>>>>>>>> store
>>>>>>>>> of kafka, then produce events onto the destination topic.
>>>>>>>>>
>>>>>>>>> Question i have is: in the case of exactly-once setting, kafka
>>>>>> streams
>>>>>>>>> guarantees that all actions happen or nothing happens. So, in this
>>>>>> case,
>>>>>>>>> any state stored on the local or global state store will also be
>>>>>> counted
>>>>>>>>> under 'all or nothing' guarantee e.g. if event is consumed and
>> state
>>>>>>>> store
>>>>>>>>> is updated, however some issue occurs before event is produced on
>>>>>>>>> destination topic then will state store be restored back to the
>> state
>>>>>>>>> before it was updated for this event?
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

With reference to your response above, i came across the JIRA ticket
https://issues.apache.org/jira/browse/KAFKA-12475

For rocksDB or in-memory state stores, these are always backed by changelog
topic, so they can be rebuilt from scratch from the changelog topic.
However, how a remote state store can be made consistent in case of error
e.g. stream consumed event from source topic, processed and stored state to
redis, and before producing event to destination topic application dies. In
this case, offset won't be committed to source topic and destination topics
anyway doesn't have the processed event, however redis holds the new state.
How can redis be wiped off the state that was saved while processing above
event(s) ?

On Wed, Jan 6, 2021 at 11:18 PM Matthias J. Sax <mj...@apache.org> wrote:

> Well, that is exactly what I mean by "it depends on the state store
> implementation".
>
> For this case, you cannot get exactly-once.
>
> There are actually ideas to improve the implementation to support the
> case you describe, but there is no timeline for this change yet. Not
> even sure if there is already a Jira ticket...
>
>
> -Matthias
>
> On 1/6/21 2:32 AM, Pushkar Deole wrote:
> > The question is if we want to use state store of 3rd party, e.g. say
> Redis,
> > how can the store be consistent with rest of the system i.e. source and
> > destination topics...
> >
> > e.g. record is consumed from source, processed, state store updated with
> > some state, but before writing to destination there is failure
> > Now, in this case, with kafka state store, it will be wiped off the state
> > stored since the transaction failed.
> >
> > But with Redis, the state store is updated with the new state and there
> is
> > no way to revert back
> >
> > On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> It depends on the store implementation. Atm, EOS for state store is
> >> achieved by re-creating the state store in case of failure from the
> >> changelog topic.
> >>
> >> For RocksDB stores, we wipe out the local state directories and create a
> >> new empty RocksDB and for in-memory stores the content is "lost" anyway
> >> when state is migrated, and we reply the changelog into an empty store
> >> before processing resumes.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/5/21 6:27 AM, Alex Craig wrote:
> >>> I don't think he's asking about data-loss, but rather data consistency.
> >>> (in the event of an exception or application crash, will EOS ensure
> that
> >>> the state store data is consistent)  My understanding is that it DOES
> >> apply
> >>> to state stores as well, in the sense that a failure during processing
> >>> would mean that the commit wouldn't get flushed and therefore wouldn't
> >> get
> >>> double-counted once processing resumes and message is re-processed.
> >>> As far as using something other than RocksDB, I think as long as you
> are
> >>> implementing the state store API correctly you should be fine.  I did a
> >> POC
> >>> recently using Mongo state-stores with EOS enabled and it worked
> >> correctly,
> >>> even when I intentionally introduced failures and crashes.
> >>>
> >>> -alex
> >>>
> >>> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com>
> >> wrote:
> >>>
> >>>> If there is a "change-log" topic to back up the state store, then it
> may
> >>>> not lose data.
> >>>>
> >>>> Also, if the third party store is not "kafka community certified" (or
> >> not
> >>>> well-maintained), it may have chances to lose data (in different
> ways).
> >>>>
> >>>>
> >>>>
> >>>> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
> >>>>> In case we opt to choose some third party store instead of kafka's
> >> stores
> >>>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
> >>>>> exactly-once guarantee provided by kafka and the state stores can be
> in
> >>>> an
> >>>>> inconsistent state ?
> >>>>>
> >>>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> The physical store behind "state store" is change-log kafka topic.
> In
> >>>>>> Kafka stream, if something fails in the middle, the "state store" is
> >>>>>> restored back to the state before the event happens at the first
> step
> >> /
> >>>>>> beginning of the stream.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> We use Kafka streams and may need to use exactly-once configuration
> >>>> for
> >>>>>>> some of the use cases. Currently, the application uses either local
> >>>> or
> >>>>>>> global state store to store state.
> >>>>>>>  So, the application will consume events from source kafka topic,
> >>>> process
> >>>>>>> the events, for state stores it will use either local or global
> state
> >>>>>> store
> >>>>>>> of kafka, then produce events onto the destination topic.
> >>>>>>>
> >>>>>>> Question i have is: in the case of exactly-once setting, kafka
> >>>> streams
> >>>>>>> guarantees that all actions happen or nothing happens. So, in this
> >>>> case,
> >>>>>>> any state stored on the local or global state store will also be
> >>>> counted
> >>>>>>> under 'all or nothing' guarantee e.g. if event is consumed and
> state
> >>>>>> store
> >>>>>>> is updated, however some issue occurs before event is produced on
> >>>>>>> destination topic then will state store be restored back to the
> state
> >>>>>>> before it was updated for this event?
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by "Matthias J. Sax" <mj...@apache.org>.
If you can make the write idempotent, you should be fine.

For this case, you might need to catch exceptions on INSERT INTO` and
convert into `UPDATE` statement.


-Matthias

On 1/19/21 5:49 AM, Pushkar Deole wrote:
> Is there also a way to avoid duplicates if the application consumer from
> kafka topic and writes the events to database?
> e.g. in case the application restarts while processing a batch read from
> topic and few events already written to database, when application
> restarts, those events are again consumed by another instance and written
> back to database.
> 
> Could this behavior be avoided somehow without putting constraints on
> database table?
> 
> On Wed, Jan 6, 2021 at 11:18 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Well, that is exactly what I mean by "it depends on the state store
>> implementation".
>>
>> For this case, you cannot get exactly-once.
>>
>> There are actually ideas to improve the implementation to support the
>> case you describe, but there is no timeline for this change yet. Not
>> even sure if there is already a Jira ticket...
>>
>>
>> -Matthias
>>
>> On 1/6/21 2:32 AM, Pushkar Deole wrote:
>>> The question is if we want to use state store of 3rd party, e.g. say
>> Redis,
>>> how can the store be consistent with rest of the system i.e. source and
>>> destination topics...
>>>
>>> e.g. record is consumed from source, processed, state store updated with
>>> some state, but before writing to destination there is failure
>>> Now, in this case, with kafka state store, it will be wiped off the state
>>> stored since the transaction failed.
>>>
>>> But with Redis, the state store is updated with the new state and there
>> is
>>> no way to revert back
>>>
>>> On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>
>>>> It depends on the store implementation. Atm, EOS for state store is
>>>> achieved by re-creating the state store in case of failure from the
>>>> changelog topic.
>>>>
>>>> For RocksDB stores, we wipe out the local state directories and create a
>>>> new empty RocksDB and for in-memory stores the content is "lost" anyway
>>>> when state is migrated, and we reply the changelog into an empty store
>>>> before processing resumes.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 1/5/21 6:27 AM, Alex Craig wrote:
>>>>> I don't think he's asking about data-loss, but rather data consistency.
>>>>> (in the event of an exception or application crash, will EOS ensure
>> that
>>>>> the state store data is consistent)  My understanding is that it DOES
>>>> apply
>>>>> to state stores as well, in the sense that a failure during processing
>>>>> would mean that the commit wouldn't get flushed and therefore wouldn't
>>>> get
>>>>> double-counted once processing resumes and message is re-processed.
>>>>> As far as using something other than RocksDB, I think as long as you
>> are
>>>>> implementing the state store API correctly you should be fine.  I did a
>>>> POC
>>>>> recently using Mongo state-stores with EOS enabled and it worked
>>>> correctly,
>>>>> even when I intentionally introduced failures and crashes.
>>>>>
>>>>> -alex
>>>>>
>>>>> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> If there is a "change-log" topic to back up the state store, then it
>> may
>>>>>> not lose data.
>>>>>>
>>>>>> Also, if the third party store is not "kafka community certified" (or
>>>> not
>>>>>> well-maintained), it may have chances to lose data (in different
>> ways).
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
>>>>>>> In case we opt to choose some third party store instead of kafka's
>>>> stores
>>>>>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
>>>>>>> exactly-once guarantee provided by kafka and the state stores can be
>> in
>>>>>> an
>>>>>>> inconsistent state ?
>>>>>>>
>>>>>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> The physical store behind "state store" is change-log kafka topic.
>> In
>>>>>>>> Kafka stream, if something fails in the middle, the "state store" is
>>>>>>>> restored back to the state before the event happens at the first
>> step
>>>> /
>>>>>>>> beginning of the stream.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> We use Kafka streams and may need to use exactly-once configuration
>>>>>> for
>>>>>>>>> some of the use cases. Currently, the application uses either local
>>>>>> or
>>>>>>>>> global state store to store state.
>>>>>>>>>  So, the application will consume events from source kafka topic,
>>>>>> process
>>>>>>>>> the events, for state stores it will use either local or global
>> state
>>>>>>>> store
>>>>>>>>> of kafka, then produce events onto the destination topic.
>>>>>>>>>
>>>>>>>>> Question i have is: in the case of exactly-once setting, kafka
>>>>>> streams
>>>>>>>>> guarantees that all actions happen or nothing happens. So, in this
>>>>>> case,
>>>>>>>>> any state stored on the local or global state store will also be
>>>>>> counted
>>>>>>>>> under 'all or nothing' guarantee e.g. if event is consumed and
>> state
>>>>>>>> store
>>>>>>>>> is updated, however some issue occurs before event is produced on
>>>>>>>>> destination topic then will state store be restored back to the
>> state
>>>>>>>>> before it was updated for this event?
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by Pushkar Deole <pd...@gmail.com>.
Is there also a way to avoid duplicates if the application consumer from
kafka topic and writes the events to database?
e.g. in case the application restarts while processing a batch read from
topic and few events already written to database, when application
restarts, those events are again consumed by another instance and written
back to database.

Could this behavior be avoided somehow without putting constraints on
database table?

On Wed, Jan 6, 2021 at 11:18 PM Matthias J. Sax <mj...@apache.org> wrote:

> Well, that is exactly what I mean by "it depends on the state store
> implementation".
>
> For this case, you cannot get exactly-once.
>
> There are actually ideas to improve the implementation to support the
> case you describe, but there is no timeline for this change yet. Not
> even sure if there is already a Jira ticket...
>
>
> -Matthias
>
> On 1/6/21 2:32 AM, Pushkar Deole wrote:
> > The question is if we want to use state store of 3rd party, e.g. say
> Redis,
> > how can the store be consistent with rest of the system i.e. source and
> > destination topics...
> >
> > e.g. record is consumed from source, processed, state store updated with
> > some state, but before writing to destination there is failure
> > Now, in this case, with kafka state store, it will be wiped off the state
> > stored since the transaction failed.
> >
> > But with Redis, the state store is updated with the new state and there
> is
> > no way to revert back
> >
> > On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> It depends on the store implementation. Atm, EOS for state store is
> >> achieved by re-creating the state store in case of failure from the
> >> changelog topic.
> >>
> >> For RocksDB stores, we wipe out the local state directories and create a
> >> new empty RocksDB and for in-memory stores the content is "lost" anyway
> >> when state is migrated, and we reply the changelog into an empty store
> >> before processing resumes.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/5/21 6:27 AM, Alex Craig wrote:
> >>> I don't think he's asking about data-loss, but rather data consistency.
> >>> (in the event of an exception or application crash, will EOS ensure
> that
> >>> the state store data is consistent)  My understanding is that it DOES
> >> apply
> >>> to state stores as well, in the sense that a failure during processing
> >>> would mean that the commit wouldn't get flushed and therefore wouldn't
> >> get
> >>> double-counted once processing resumes and message is re-processed.
> >>> As far as using something other than RocksDB, I think as long as you
> are
> >>> implementing the state store API correctly you should be fine.  I did a
> >> POC
> >>> recently using Mongo state-stores with EOS enabled and it worked
> >> correctly,
> >>> even when I intentionally introduced failures and crashes.
> >>>
> >>> -alex
> >>>
> >>> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com>
> >> wrote:
> >>>
> >>>> If there is a "change-log" topic to back up the state store, then it
> may
> >>>> not lose data.
> >>>>
> >>>> Also, if the third party store is not "kafka community certified" (or
> >> not
> >>>> well-maintained), it may have chances to lose data (in different
> ways).
> >>>>
> >>>>
> >>>>
> >>>> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
> >>>>> In case we opt to choose some third party store instead of kafka's
> >> stores
> >>>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
> >>>>> exactly-once guarantee provided by kafka and the state stores can be
> in
> >>>> an
> >>>>> inconsistent state ?
> >>>>>
> >>>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> The physical store behind "state store" is change-log kafka topic.
> In
> >>>>>> Kafka stream, if something fails in the middle, the "state store" is
> >>>>>> restored back to the state before the event happens at the first
> step
> >> /
> >>>>>> beginning of the stream.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> We use Kafka streams and may need to use exactly-once configuration
> >>>> for
> >>>>>>> some of the use cases. Currently, the application uses either local
> >>>> or
> >>>>>>> global state store to store state.
> >>>>>>>  So, the application will consume events from source kafka topic,
> >>>> process
> >>>>>>> the events, for state stores it will use either local or global
> state
> >>>>>> store
> >>>>>>> of kafka, then produce events onto the destination topic.
> >>>>>>>
> >>>>>>> Question i have is: in the case of exactly-once setting, kafka
> >>>> streams
> >>>>>>> guarantees that all actions happen or nothing happens. So, in this
> >>>> case,
> >>>>>>> any state stored on the local or global state store will also be
> >>>> counted
> >>>>>>> under 'all or nothing' guarantee e.g. if event is consumed and
> state
> >>>>>> store
> >>>>>>> is updated, however some issue occurs before event is produced on
> >>>>>>> destination topic then will state store be restored back to the
> state
> >>>>>>> before it was updated for this event?
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by "Matthias J. Sax" <mj...@apache.org>.
Well, that is exactly what I mean by "it depends on the state store
implementation".

For this case, you cannot get exactly-once.

There are actually ideas to improve the implementation to support the
case you describe, but there is no timeline for this change yet. Not
even sure if there is already a Jira ticket...


-Matthias

On 1/6/21 2:32 AM, Pushkar Deole wrote:
> The question is if we want to use state store of 3rd party, e.g. say Redis,
> how can the store be consistent with rest of the system i.e. source and
> destination topics...
> 
> e.g. record is consumed from source, processed, state store updated with
> some state, but before writing to destination there is failure
> Now, in this case, with kafka state store, it will be wiped off the state
> stored since the transaction failed.
> 
> But with Redis, the state store is updated with the new state and there is
> no way to revert back
> 
> On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> It depends on the store implementation. Atm, EOS for state store is
>> achieved by re-creating the state store in case of failure from the
>> changelog topic.
>>
>> For RocksDB stores, we wipe out the local state directories and create a
>> new empty RocksDB and for in-memory stores the content is "lost" anyway
>> when state is migrated, and we reply the changelog into an empty store
>> before processing resumes.
>>
>>
>> -Matthias
>>
>> On 1/5/21 6:27 AM, Alex Craig wrote:
>>> I don't think he's asking about data-loss, but rather data consistency.
>>> (in the event of an exception or application crash, will EOS ensure that
>>> the state store data is consistent)  My understanding is that it DOES
>> apply
>>> to state stores as well, in the sense that a failure during processing
>>> would mean that the commit wouldn't get flushed and therefore wouldn't
>> get
>>> double-counted once processing resumes and message is re-processed.
>>> As far as using something other than RocksDB, I think as long as you are
>>> implementing the state store API correctly you should be fine.  I did a
>> POC
>>> recently using Mongo state-stores with EOS enabled and it worked
>> correctly,
>>> even when I intentionally introduced failures and crashes.
>>>
>>> -alex
>>>
>>> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com>
>> wrote:
>>>
>>>> If there is a "change-log" topic to back up the state store, then it may
>>>> not lose data.
>>>>
>>>> Also, if the third party store is not "kafka community certified" (or
>> not
>>>> well-maintained), it may have chances to lose data (in different ways).
>>>>
>>>>
>>>>
>>>> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
>>>>> In case we opt to choose some third party store instead of kafka's
>> stores
>>>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
>>>>> exactly-once guarantee provided by kafka and the state stores can be in
>>>> an
>>>>> inconsistent state ?
>>>>>
>>>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> The physical store behind "state store" is change-log kafka topic. In
>>>>>> Kafka stream, if something fails in the middle, the "state store" is
>>>>>> restored back to the state before the event happens at the first step
>> /
>>>>>> beginning of the stream.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
>>>>>>> Hi All,
>>>>>>>
>>>>>>> We use Kafka streams and may need to use exactly-once configuration
>>>> for
>>>>>>> some of the use cases. Currently, the application uses either local
>>>> or
>>>>>>> global state store to store state.
>>>>>>>  So, the application will consume events from source kafka topic,
>>>> process
>>>>>>> the events, for state stores it will use either local or global state
>>>>>> store
>>>>>>> of kafka, then produce events onto the destination topic.
>>>>>>>
>>>>>>> Question i have is: in the case of exactly-once setting, kafka
>>>> streams
>>>>>>> guarantees that all actions happen or nothing happens. So, in this
>>>> case,
>>>>>>> any state stored on the local or global state store will also be
>>>> counted
>>>>>>> under 'all or nothing' guarantee e.g. if event is consumed and state
>>>>>> store
>>>>>>> is updated, however some issue occurs before event is produced on
>>>>>>> destination topic then will state store be restored back to the state
>>>>>>> before it was updated for this event?
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by Pushkar Deole <pd...@gmail.com>.
The question is if we want to use state store of 3rd party, e.g. say Redis,
how can the store be consistent with rest of the system i.e. source and
destination topics...

e.g. record is consumed from source, processed, state store updated with
some state, but before writing to destination there is failure
Now, in this case, with kafka state store, it will be wiped off the state
stored since the transaction failed.

But with Redis, the state store is updated with the new state and there is
no way to revert back

On Tue, Jan 5, 2021 at 11:11 PM Matthias J. Sax <mj...@apache.org> wrote:

> It depends on the store implementation. Atm, EOS for state store is
> achieved by re-creating the state store in case of failure from the
> changelog topic.
>
> For RocksDB stores, we wipe out the local state directories and create a
> new empty RocksDB and for in-memory stores the content is "lost" anyway
> when state is migrated, and we reply the changelog into an empty store
> before processing resumes.
>
>
> -Matthias
>
> On 1/5/21 6:27 AM, Alex Craig wrote:
> > I don't think he's asking about data-loss, but rather data consistency.
> > (in the event of an exception or application crash, will EOS ensure that
> > the state store data is consistent)  My understanding is that it DOES
> apply
> > to state stores as well, in the sense that a failure during processing
> > would mean that the commit wouldn't get flushed and therefore wouldn't
> get
> > double-counted once processing resumes and message is re-processed.
> > As far as using something other than RocksDB, I think as long as you are
> > implementing the state store API correctly you should be fine.  I did a
> POC
> > recently using Mongo state-stores with EOS enabled and it worked
> correctly,
> > even when I intentionally introduced failures and crashes.
> >
> > -alex
> >
> > On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com>
> wrote:
> >
> >> If there is a "change-log" topic to back up the state store, then it may
> >> not lose data.
> >>
> >> Also, if the third party store is not "kafka community certified" (or
> not
> >> well-maintained), it may have chances to lose data (in different ways).
> >>
> >>
> >>
> >> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
> >>> In case we opt to choose some third party store instead of kafka's
> stores
> >>> for storing state (e.g. Redis cache or Ignite), then will we lose the
> >>> exactly-once guarantee provided by kafka and the state stores can be in
> >> an
> >>> inconsistent state ?
> >>>
> >>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
> >> wrote:
> >>>
> >>>> The physical store behind "state store" is change-log kafka topic. In
> >>>> Kafka stream, if something fails in the middle, the "state store" is
> >>>> restored back to the state before the event happens at the first step
> /
> >>>> beginning of the stream.
> >>>>
> >>>>
> >>>>
> >>>> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
> >>>>> Hi All,
> >>>>>
> >>>>> We use Kafka streams and may need to use exactly-once configuration
> >> for
> >>>>> some of the use cases. Currently, the application uses either local
> >> or
> >>>>> global state store to store state.
> >>>>>  So, the application will consume events from source kafka topic,
> >> process
> >>>>> the events, for state stores it will use either local or global state
> >>>> store
> >>>>> of kafka, then produce events onto the destination topic.
> >>>>>
> >>>>> Question i have is: in the case of exactly-once setting, kafka
> >> streams
> >>>>> guarantees that all actions happen or nothing happens. So, in this
> >> case,
> >>>>> any state stored on the local or global state store will also be
> >> counted
> >>>>> under 'all or nothing' guarantee e.g. if event is consumed and state
> >>>> store
> >>>>> is updated, however some issue occurs before event is produced on
> >>>>> destination topic then will state store be restored back to the state
> >>>>> before it was updated for this event?
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by "Matthias J. Sax" <mj...@apache.org>.
It depends on the store implementation. Atm, EOS for state store is
achieved by re-creating the state store in case of failure from the
changelog topic.

For RocksDB stores, we wipe out the local state directories and create a
new empty RocksDB and for in-memory stores the content is "lost" anyway
when state is migrated, and we reply the changelog into an empty store
before processing resumes.


-Matthias

On 1/5/21 6:27 AM, Alex Craig wrote:
> I don't think he's asking about data-loss, but rather data consistency.
> (in the event of an exception or application crash, will EOS ensure that
> the state store data is consistent)  My understanding is that it DOES apply
> to state stores as well, in the sense that a failure during processing
> would mean that the commit wouldn't get flushed and therefore wouldn't get
> double-counted once processing resumes and message is re-processed.
> As far as using something other than RocksDB, I think as long as you are
> implementing the state store API correctly you should be fine.  I did a POC
> recently using Mongo state-stores with EOS enabled and it worked correctly,
> even when I intentionally introduced failures and crashes.
> 
> -alex
> 
> On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com> wrote:
> 
>> If there is a "change-log" topic to back up the state store, then it may
>> not lose data.
>>
>> Also, if the third party store is not "kafka community certified" (or not
>> well-maintained), it may have chances to lose data (in different ways).
>>
>>
>>
>> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
>>> In case we opt to choose some third party store instead of kafka's stores
>>> for storing state (e.g. Redis cache or Ignite), then will we lose the
>>> exactly-once guarantee provided by kafka and the state stores can be in
>> an
>>> inconsistent state ?
>>>
>>> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
>> wrote:
>>>
>>>> The physical store behind "state store" is change-log kafka topic. In
>>>> Kafka stream, if something fails in the middle, the "state store" is
>>>> restored back to the state before the event happens at the first step /
>>>> beginning of the stream.
>>>>
>>>>
>>>>
>>>> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
>>>>> Hi All,
>>>>>
>>>>> We use Kafka streams and may need to use exactly-once configuration
>> for
>>>>> some of the use cases. Currently, the application uses either local
>> or
>>>>> global state store to store state.
>>>>>  So, the application will consume events from source kafka topic,
>> process
>>>>> the events, for state stores it will use either local or global state
>>>> store
>>>>> of kafka, then produce events onto the destination topic.
>>>>>
>>>>> Question i have is: in the case of exactly-once setting, kafka
>> streams
>>>>> guarantees that all actions happen or nothing happens. So, in this
>> case,
>>>>> any state stored on the local or global state store will also be
>> counted
>>>>> under 'all or nothing' guarantee e.g. if event is consumed and state
>>>> store
>>>>> is updated, however some issue occurs before event is produced on
>>>>> destination topic then will state store be restored back to the state
>>>>> before it was updated for this event?
>>>>>
>>>>
>>>
>>
> 

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by Alex Craig <al...@gmail.com>.
I don't think he's asking about data-loss, but rather data consistency.
(in the event of an exception or application crash, will EOS ensure that
the state store data is consistent)  My understanding is that it DOES apply
to state stores as well, in the sense that a failure during processing
would mean that the commit wouldn't get flushed and therefore wouldn't get
double-counted once processing resumes and message is re-processed.
As far as using something other than RocksDB, I think as long as you are
implementing the state store API correctly you should be fine.  I did a POC
recently using Mongo state-stores with EOS enabled and it worked correctly,
even when I intentionally introduced failures and crashes.

-alex

On Tue, Jan 5, 2021 at 1:09 AM Ning Zhang <ni...@gmail.com> wrote:

> If there is a "change-log" topic to back up the state store, then it may
> not lose data.
>
> Also, if the third party store is not "kafka community certified" (or not
> well-maintained), it may have chances to lose data (in different ways).
>
>
>
> On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote:
> > In case we opt to choose some third party store instead of kafka's stores
> > for storing state (e.g. Redis cache or Ignite), then will we lose the
> > exactly-once guarantee provided by kafka and the state stores can be in
> an
> > inconsistent state ?
> >
> > On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com>
> wrote:
> >
> > > The physical store behind "state store" is change-log kafka topic. In
> > > Kafka stream, if something fails in the middle, the "state store" is
> > > restored back to the state before the event happens at the first step /
> > > beginning of the stream.
> > >
> > >
> > >
> > > On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
> > > > Hi All,
> > > >
> > > > We use Kafka streams and may need to use exactly-once configuration
> for
> > > > some of the use cases. Currently, the application uses either local
> or
> > > > global state store to store state.
> > > >  So, the application will consume events from source kafka topic,
> process
> > > > the events, for state stores it will use either local or global state
> > > store
> > > > of kafka, then produce events onto the destination topic.
> > > >
> > > > Question i have is: in the case of exactly-once setting, kafka
> streams
> > > > guarantees that all actions happen or nothing happens. So, in this
> case,
> > > > any state stored on the local or global state store will also be
> counted
> > > > under 'all or nothing' guarantee e.g. if event is consumed and state
> > > store
> > > > is updated, however some issue occurs before event is produced on
> > > > destination topic then will state store be restored back to the state
> > > > before it was updated for this event?
> > > >
> > >
> >
>

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by Ning Zhang <ni...@gmail.com>.
If there is a "change-log" topic to back up the state store, then it may not lose data.

Also, if the third party store is not "kafka community certified" (or not well-maintained), it may have chances to lose data (in different ways).

 

On 2021/01/05 04:56:12, Pushkar Deole <pd...@gmail.com> wrote: 
> In case we opt to choose some third party store instead of kafka's stores
> for storing state (e.g. Redis cache or Ignite), then will we lose the
> exactly-once guarantee provided by kafka and the state stores can be in an
> inconsistent state ?
> 
> On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com> wrote:
> 
> > The physical store behind "state store" is change-log kafka topic. In
> > Kafka stream, if something fails in the middle, the "state store" is
> > restored back to the state before the event happens at the first step /
> > beginning of the stream.
> >
> >
> >
> > On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
> > > Hi All,
> > >
> > > We use Kafka streams and may need to use exactly-once configuration for
> > > some of the use cases. Currently, the application uses either local or
> > > global state store to store state.
> > >  So, the application will consume events from source kafka topic, process
> > > the events, for state stores it will use either local or global state
> > store
> > > of kafka, then produce events onto the destination topic.
> > >
> > > Question i have is: in the case of exactly-once setting, kafka streams
> > > guarantees that all actions happen or nothing happens. So, in this case,
> > > any state stored on the local or global state store will also be counted
> > > under 'all or nothing' guarantee e.g. if event is consumed and state
> > store
> > > is updated, however some issue occurs before event is produced on
> > > destination topic then will state store be restored back to the state
> > > before it was updated for this event?
> > >
> >
> 

Re: does Kafka exactly-once guarantee also apply to kafka state stores?

Posted by Pushkar Deole <pd...@gmail.com>.
In case we opt to choose some third party store instead of kafka's stores
for storing state (e.g. Redis cache or Ignite), then will we lose the
exactly-once guarantee provided by kafka and the state stores can be in an
inconsistent state ?

On Sat, Jan 2, 2021 at 4:56 AM Ning Zhang <ni...@gmail.com> wrote:

> The physical store behind "state store" is change-log kafka topic. In
> Kafka stream, if something fails in the middle, the "state store" is
> restored back to the state before the event happens at the first step /
> beginning of the stream.
>
>
>
> On 2020/12/31 08:48:16, Pushkar Deole <pd...@gmail.com> wrote:
> > Hi All,
> >
> > We use Kafka streams and may need to use exactly-once configuration for
> > some of the use cases. Currently, the application uses either local or
> > global state store to store state.
> >  So, the application will consume events from source kafka topic, process
> > the events, for state stores it will use either local or global state
> store
> > of kafka, then produce events onto the destination topic.
> >
> > Question i have is: in the case of exactly-once setting, kafka streams
> > guarantees that all actions happen or nothing happens. So, in this case,
> > any state stored on the local or global state store will also be counted
> > under 'all or nothing' guarantee e.g. if event is consumed and state
> store
> > is updated, however some issue occurs before event is produced on
> > destination topic then will state store be restored back to the state
> > before it was updated for this event?
> >
>