You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2022/12/05 16:52:50 UTC

Re: same keys appearing in state stores on different pods when using branches in kafka streams

John or Matthias

can you help here, we are frequently getting errors like below:

org.apache.kafka.streams.errors.InvalidStateStoreException: The state
store, records, may have migrated to another instance.

For the same key, the record exist in totals state store but not in
'records' state store.

John,

can you provide more details on the groupBy option?

On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole <pd...@gmail.com> wrote:

> Hi John,
>
> I am not sure I understood it correctly, even with branching that uses a
> different state store, the key of incoming event is still the same, so we
> expect it to land in the local state store on the same pod.
> e.g. an event with OPEN status, with key xyz came in and processed through
> one branch and it is stored in state store 'totals', state maintained on
> local state store on same pod
> 2nd event with OPEN status, with key xyz came in and again processed and
> stored in 'totals'. State maintained on local state store on same pod
>
> 3rd event with CLOSED status, with key xyz came in and processed. The
> state is stored in 'record' state store, it is expected to be stored in
> state store on same pod.
> Why it would go to some other pod?
>
> On Wed, Nov 23, 2022 at 8:50 PM John Roesler <vv...@apache.org> wrote:
>
>> Hi Pushkar,
>>
>> Thanks for the question. I think that what’s happening is that, even
>> though both branches use the same grouping logic, Streams can’t detect that
>> they are the same. It just sees two group-bys and therefore introduces two
>> repartitions, with a separate downstream task for each.
>>
>> You might want to print out the topology description and visualize it
>> with https://zz85.github.io/kafka-streams-viz/ . That will show you
>> whether the stores wind up in the same task or not.
>>
>> The visualization will also show you the names of the input topics for
>> those two partitions, which you can use in conjunction with the metadata
>> methods on your KafkaStreams instance to query for the location of the keys
>> in both stores.
>>
>> I suspect that with some tweaks you can re-write the topology to just
>> have one downstream task, if that’s what you prefer.
>>
>> By the way, I think you could propose to add an optimization to make the
>> groupBy behave the way you expected. If that’s interesting to you, let us
>> know and we can give you some pointers!
>>
>> I hope this helps,
>> John
>>
>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
>> > Hi All,
>> >
>> > I have a stream application that creates 2 branches.  Each branch
>> includes
>> > a state store where the status field of the kafka message determines the
>> > branch, and therefore the state store used:
>> >
>> > Status OPEN = State store name totals
>> >
>> > Status CLOSED = State store name records
>> >
>> >
>> >
>> > I’m seeing that the streams application is running on a pod; however I’m
>> > getting the exception:
>> >
>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
>> > store, records, may have migrated to another instance.
>> >
>> >
>> >
>> > If I physically access the pod and check the Rocksdb folders I do not
>> see
>> > the state store folder.  If I check the keys in the totals state store
>> on
>> > this pod, I can find the key in the records state store on another pod.
>> I
>> > had assumed that because the key of the events are the same, the same
>> > partition would be used for the two branches and therefore the same
>> keys in
>> > these two state store would be created on the same Kubernetes pod.
>> This is
>> > not an issue for the Kafka stream, but that assumption was used in the
>> way
>> > the state stores are read.  I assumed if I found the key in the 'totals'
>> > state store, the same key would be found on the same pod in the
>> 'records'
>> > state store.
>> >
>> >
>> >
>> > The questions I have are:
>> >
>> > 1) Is it expected that the state stores can hold the partition data on
>> > different pods, and is this unique to streams using branch?
>> >
>> > 2) Is there a way to know if the state store is on the pod to avoid
>> > handling this as an exception?
>> >
>> >
>> >
>> > Here is the topology of the stream in question:
>> >
>> >         KStream<String, ConsolidatedIntervalTotalsModel>[] branches =
>> stream
>> >
>> >             .peek(receivingEventLogger)
>> >
>> >             .selectKey(keyMapper)
>> >
>> >             .mapValues(totalsValueMapper)
>> >
>> >             .filter(nullKeyValueEventFilter)
>> >
>> >             .branch((k, v) -> (RecordStatus.CLOSED.name
>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())
>> >
>> >                         || RecordStatus.LB_RDELETE.name
>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())),
>> >
>> >                     (k, v) -> true);
>> >
>> >
>> >
>> >         // CLOSED and LB_RDELETE branch writes to records state store
>> >
>> >         branches[0]
>> >
>> >             .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>> >
>> >             .aggregate(totalsInitializer, totalsAggregator,
>> > materializedRecords)
>> >
>> >             .toStream()
>> >
>> >             .map(totalsInternalKeyValueMapper)
>> >
>> >             .filter(nullKeyStringValueEventFilter)
>> >
>> >             .to(loopbackTopic.name());
>> >
>> >
>> >
>> >         // DEFAULT branch writes to totals state store
>> >
>> >         branches[1]
>> >
>> >             .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>> >
>> >             .aggregate(totalsInitializer, totalsAggregator,
>> > materializedTotals)
>> >
>> >             .toStream()
>> >
>> >             .flatMap(totalsKeyValueMapper)
>> >
>> >             .filter(nullKeyStringValueEventFilter)
>> >
>> >             .peek(sendingEventLogger)
>> >
>> >             .to(toTopic.name());
>>
>

Re: same keys appearing in state stores on different pods when using branches in kafka streams

Posted by Pushkar Deole <pd...@gmail.com>.
Thanks John R.

I am adding John Brackin to this thread, who can provide further details of
topology description

On Tue, Dec 6, 2022 at 8:28 AM John Roesler <vv...@apache.org> wrote:

> Hi Pushkar,
>
> I'm sorry for the delay. I'm afraid I'm having trouble picturing the
> situation. Can you provide the topology description? That will show us
> whether we should expect the stores to always be in the same instances or
> not. If you can also include a simplified version of your program, we might
> be able to provide some suggestions.
>
> Thanks,
> -John
>
> On Mon, Dec 5, 2022, at 10:52, Pushkar Deole wrote:
> > John or Matthias
> >
> > can you help here, we are frequently getting errors like below:
> >
> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> > store, records, may have migrated to another instance.
> >
> > For the same key, the record exist in totals state store but not in
> > 'records' state store.
> >
> > John,
> >
> > can you provide more details on the groupBy option?
> >
> > On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole <pd...@gmail.com>
> wrote:
> >
> >> Hi John,
> >>
> >> I am not sure I understood it correctly, even with branching that uses a
> >> different state store, the key of incoming event is still the same, so
> we
> >> expect it to land in the local state store on the same pod.
> >> e.g. an event with OPEN status, with key xyz came in and processed
> through
> >> one branch and it is stored in state store 'totals', state maintained on
> >> local state store on same pod
> >> 2nd event with OPEN status, with key xyz came in and again processed and
> >> stored in 'totals'. State maintained on local state store on same pod
> >>
> >> 3rd event with CLOSED status, with key xyz came in and processed. The
> >> state is stored in 'record' state store, it is expected to be stored in
> >> state store on same pod.
> >> Why it would go to some other pod?
> >>
> >> On Wed, Nov 23, 2022 at 8:50 PM John Roesler <vv...@apache.org>
> wrote:
> >>
> >>> Hi Pushkar,
> >>>
> >>> Thanks for the question. I think that what’s happening is that, even
> >>> though both branches use the same grouping logic, Streams can’t detect
> that
> >>> they are the same. It just sees two group-bys and therefore introduces
> two
> >>> repartitions, with a separate downstream task for each.
> >>>
> >>> You might want to print out the topology description and visualize it
> >>> with https://zz85.github.io/kafka-streams-viz/ . That will show you
> >>> whether the stores wind up in the same task or not.
> >>>
> >>> The visualization will also show you the names of the input topics for
> >>> those two partitions, which you can use in conjunction with the
> metadata
> >>> methods on your KafkaStreams instance to query for the location of the
> keys
> >>> in both stores.
> >>>
> >>> I suspect that with some tweaks you can re-write the topology to just
> >>> have one downstream task, if that’s what you prefer.
> >>>
> >>> By the way, I think you could propose to add an optimization to make
> the
> >>> groupBy behave the way you expected. If that’s interesting to you, let
> us
> >>> know and we can give you some pointers!
> >>>
> >>> I hope this helps,
> >>> John
> >>>
> >>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
> >>> > Hi All,
> >>> >
> >>> > I have a stream application that creates 2 branches.  Each branch
> >>> includes
> >>> > a state store where the status field of the kafka message determines
> the
> >>> > branch, and therefore the state store used:
> >>> >
> >>> > Status OPEN = State store name totals
> >>> >
> >>> > Status CLOSED = State store name records
> >>> >
> >>> >
> >>> >
> >>> > I’m seeing that the streams application is running on a pod; however
> I’m
> >>> > getting the exception:
> >>> >
> >>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> >>> > store, records, may have migrated to another instance.
> >>> >
> >>> >
> >>> >
> >>> > If I physically access the pod and check the Rocksdb folders I do not
> >>> see
> >>> > the state store folder.  If I check the keys in the totals state
> store
> >>> on
> >>> > this pod, I can find the key in the records state store on another
> pod.
> >>> I
> >>> > had assumed that because the key of the events are the same, the same
> >>> > partition would be used for the two branches and therefore the same
> >>> keys in
> >>> > these two state store would be created on the same Kubernetes pod.
> >>> This is
> >>> > not an issue for the Kafka stream, but that assumption was used in
> the
> >>> way
> >>> > the state stores are read.  I assumed if I found the key in the
> 'totals'
> >>> > state store, the same key would be found on the same pod in the
> >>> 'records'
> >>> > state store.
> >>> >
> >>> >
> >>> >
> >>> > The questions I have are:
> >>> >
> >>> > 1) Is it expected that the state stores can hold the partition data
> on
> >>> > different pods, and is this unique to streams using branch?
> >>> >
> >>> > 2) Is there a way to know if the state store is on the pod to avoid
> >>> > handling this as an exception?
> >>> >
> >>> >
> >>> >
> >>> > Here is the topology of the stream in question:
> >>> >
> >>> >         KStream<String, ConsolidatedIntervalTotalsModel>[] branches =
> >>> stream
> >>> >
> >>> >             .peek(receivingEventLogger)
> >>> >
> >>> >             .selectKey(keyMapper)
> >>> >
> >>> >             .mapValues(totalsValueMapper)
> >>> >
> >>> >             .filter(nullKeyValueEventFilter)
> >>> >
> >>> >             .branch((k, v) -> (RecordStatus.CLOSED.name
> >>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())
> >>> >
> >>> >                         || RecordStatus.LB_RDELETE.name
> >>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())),
> >>> >
> >>> >                     (k, v) -> true);
> >>> >
> >>> >
> >>> >
> >>> >         // CLOSED and LB_RDELETE branch writes to records state store
> >>> >
> >>> >         branches[0]
> >>> >
> >>> >             .groupByKey(Grouped.with(Serdes.String(),
> totalsValueSerde))
> >>> >
> >>> >             .aggregate(totalsInitializer, totalsAggregator,
> >>> > materializedRecords)
> >>> >
> >>> >             .toStream()
> >>> >
> >>> >             .map(totalsInternalKeyValueMapper)
> >>> >
> >>> >             .filter(nullKeyStringValueEventFilter)
> >>> >
> >>> >             .to(loopbackTopic.name());
> >>> >
> >>> >
> >>> >
> >>> >         // DEFAULT branch writes to totals state store
> >>> >
> >>> >         branches[1]
> >>> >
> >>> >             .groupByKey(Grouped.with(Serdes.String(),
> totalsValueSerde))
> >>> >
> >>> >             .aggregate(totalsInitializer, totalsAggregator,
> >>> > materializedTotals)
> >>> >
> >>> >             .toStream()
> >>> >
> >>> >             .flatMap(totalsKeyValueMapper)
> >>> >
> >>> >             .filter(nullKeyStringValueEventFilter)
> >>> >
> >>> >             .peek(sendingEventLogger)
> >>> >
> >>> >             .to(toTopic.name());
> >>>
> >>
>

Re: same keys appearing in state stores on different pods when using branches in kafka streams

Posted by John Roesler <vv...@apache.org>.
Hi Pushkar,

I'm sorry for the delay. I'm afraid I'm having trouble picturing the situation. Can you provide the topology description? That will show us whether we should expect the stores to always be in the same instances or not. If you can also include a simplified version of your program, we might be able to provide some suggestions.

Thanks,
-John

On Mon, Dec 5, 2022, at 10:52, Pushkar Deole wrote:
> John or Matthias
>
> can you help here, we are frequently getting errors like below:
>
> org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> store, records, may have migrated to another instance.
>
> For the same key, the record exist in totals state store but not in
> 'records' state store.
>
> John,
>
> can you provide more details on the groupBy option?
>
> On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole <pd...@gmail.com> wrote:
>
>> Hi John,
>>
>> I am not sure I understood it correctly, even with branching that uses a
>> different state store, the key of incoming event is still the same, so we
>> expect it to land in the local state store on the same pod.
>> e.g. an event with OPEN status, with key xyz came in and processed through
>> one branch and it is stored in state store 'totals', state maintained on
>> local state store on same pod
>> 2nd event with OPEN status, with key xyz came in and again processed and
>> stored in 'totals'. State maintained on local state store on same pod
>>
>> 3rd event with CLOSED status, with key xyz came in and processed. The
>> state is stored in 'record' state store, it is expected to be stored in
>> state store on same pod.
>> Why it would go to some other pod?
>>
>> On Wed, Nov 23, 2022 at 8:50 PM John Roesler <vv...@apache.org> wrote:
>>
>>> Hi Pushkar,
>>>
>>> Thanks for the question. I think that what’s happening is that, even
>>> though both branches use the same grouping logic, Streams can’t detect that
>>> they are the same. It just sees two group-bys and therefore introduces two
>>> repartitions, with a separate downstream task for each.
>>>
>>> You might want to print out the topology description and visualize it
>>> with https://zz85.github.io/kafka-streams-viz/ . That will show you
>>> whether the stores wind up in the same task or not.
>>>
>>> The visualization will also show you the names of the input topics for
>>> those two partitions, which you can use in conjunction with the metadata
>>> methods on your KafkaStreams instance to query for the location of the keys
>>> in both stores.
>>>
>>> I suspect that with some tweaks you can re-write the topology to just
>>> have one downstream task, if that’s what you prefer.
>>>
>>> By the way, I think you could propose to add an optimization to make the
>>> groupBy behave the way you expected. If that’s interesting to you, let us
>>> know and we can give you some pointers!
>>>
>>> I hope this helps,
>>> John
>>>
>>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
>>> > Hi All,
>>> >
>>> > I have a stream application that creates 2 branches.  Each branch
>>> includes
>>> > a state store where the status field of the kafka message determines the
>>> > branch, and therefore the state store used:
>>> >
>>> > Status OPEN = State store name totals
>>> >
>>> > Status CLOSED = State store name records
>>> >
>>> >
>>> >
>>> > I’m seeing that the streams application is running on a pod; however I’m
>>> > getting the exception:
>>> >
>>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
>>> > store, records, may have migrated to another instance.
>>> >
>>> >
>>> >
>>> > If I physically access the pod and check the Rocksdb folders I do not
>>> see
>>> > the state store folder.  If I check the keys in the totals state store
>>> on
>>> > this pod, I can find the key in the records state store on another pod.
>>> I
>>> > had assumed that because the key of the events are the same, the same
>>> > partition would be used for the two branches and therefore the same
>>> keys in
>>> > these two state store would be created on the same Kubernetes pod.
>>> This is
>>> > not an issue for the Kafka stream, but that assumption was used in the
>>> way
>>> > the state stores are read.  I assumed if I found the key in the 'totals'
>>> > state store, the same key would be found on the same pod in the
>>> 'records'
>>> > state store.
>>> >
>>> >
>>> >
>>> > The questions I have are:
>>> >
>>> > 1) Is it expected that the state stores can hold the partition data on
>>> > different pods, and is this unique to streams using branch?
>>> >
>>> > 2) Is there a way to know if the state store is on the pod to avoid
>>> > handling this as an exception?
>>> >
>>> >
>>> >
>>> > Here is the topology of the stream in question:
>>> >
>>> >         KStream<String, ConsolidatedIntervalTotalsModel>[] branches =
>>> stream
>>> >
>>> >             .peek(receivingEventLogger)
>>> >
>>> >             .selectKey(keyMapper)
>>> >
>>> >             .mapValues(totalsValueMapper)
>>> >
>>> >             .filter(nullKeyValueEventFilter)
>>> >
>>> >             .branch((k, v) -> (RecordStatus.CLOSED.name
>>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())
>>> >
>>> >                         || RecordStatus.LB_RDELETE.name
>>> > ().equalsIgnoreCase(v.getCurrent().getRecordStatus())),
>>> >
>>> >                     (k, v) -> true);
>>> >
>>> >
>>> >
>>> >         // CLOSED and LB_RDELETE branch writes to records state store
>>> >
>>> >         branches[0]
>>> >
>>> >             .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>>> >
>>> >             .aggregate(totalsInitializer, totalsAggregator,
>>> > materializedRecords)
>>> >
>>> >             .toStream()
>>> >
>>> >             .map(totalsInternalKeyValueMapper)
>>> >
>>> >             .filter(nullKeyStringValueEventFilter)
>>> >
>>> >             .to(loopbackTopic.name());
>>> >
>>> >
>>> >
>>> >         // DEFAULT branch writes to totals state store
>>> >
>>> >         branches[1]
>>> >
>>> >             .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>>> >
>>> >             .aggregate(totalsInitializer, totalsAggregator,
>>> > materializedTotals)
>>> >
>>> >             .toStream()
>>> >
>>> >             .flatMap(totalsKeyValueMapper)
>>> >
>>> >             .filter(nullKeyStringValueEventFilter)
>>> >
>>> >             .peek(sendingEventLogger)
>>> >
>>> >             .to(toTopic.name());
>>>
>>