You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jan Bols <ja...@telenet.be> on 2020/01/02 21:46:55 UTC

complicated logic for tombstone records

Hi,
I have a rather complicated kafka streams application involving multiple
joins, aggregates, maps etc. At a certain point, parts of the data needs to
be removed throughout the entire streams topology, both in the topics, the
changelogs and the rocksdb state stores.

Managing this requires a lot of effort and things get very complex. F.e.
when a KStream has a null value and is aggregated, you first need to
convert it into some optional value instead b/c aggregates ignore nulls.

Is there a better way or a way that does not impact all the existing
streaming logic?

I was thinking about having an out-of-bound process that sends null values
to all topics with the correct keys. I could then filter out all null
values before doing the rest of the existing stream logic.
Would that make sense?

I can send null values to all my topics, but how do I get the changelog
topics created by kafka-streams. And what about the state store?

Best regards
Jan

Re: complicated logic for tombstone records

Posted by Jan Bols <ja...@telenet.be>.
Hi Boyang, Hi Alex,

thank you for your reply. I can't use windowing so currently I'm managing
removals by wrapping messages in a delete-aware wrapper whenever I have to
do aggregation but this has a big impact on all the logic.

For me the ideal situation would be to get a handle on the state stores
that are being used during aggregation and other processors of the streams
DSL and programmatically delete them from the store whenever needed. This
way I can keep the changes to my streaming logic minimal and still delete
parts of it whenever needed.

Is there any way to do that? I know I can get a read-only reference to the
state stores using queryable stores but that won't do.

Jan

On Thu, Jan 2, 2020 at 11:17 PM Alex Brekken <br...@gmail.com> wrote:

> Hi Jan, unfortunately there is no easy or automatic way to do this.
> Publishing null values directly to the changelog topics will remove them
> from the topic, but it won't remove the corresponding row from the RocksDB
> state store.  (though deleting data programmatically from a state-store
> WILL also remove it from the changelog topic)  Given that you want to
> completely remove the data for a given set of keys, your best option might
> be to modify your topology to handle null messages so that they can get
> deleted from your aggregations. (and publish those from an outside app)
> Hopefully this isn't too self-serving, but I actually wrote a blog post
> about managing state-store data not long ago:
>
> https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
> .
> Hopefully that might give you some ideas.
>
> Alex
>
> On Thu, Jan 2, 2020 at 4:11 PM Boyang Chen <re...@gmail.com>
> wrote:
>
> > Hey Jan,
> >
> > although I believe your case is much more complicated, but would time
> based
> > retention work for you at all? If yes, time window store is like the best
> > option.
> >
> > If no, streams has no out-of-box solution for invalidating the
> aggregation
> > record. It seems at least we could provide an API to inject
> > tombstone records for aggregation logic
> > so that they don't get ignored eventually. This sounds like a good future
> > work.
> >
> > Boyang
> >
> > On Thu, Jan 2, 2020 at 1:47 PM Jan Bols <ja...@telenet.be> wrote:
> >
> > > Hi,
> > > I have a rather complicated kafka streams application involving
> multiple
> > > joins, aggregates, maps etc. At a certain point, parts of the data
> needs
> > to
> > > be removed throughout the entire streams topology, both in the topics,
> > the
> > > changelogs and the rocksdb state stores.
> > >
> > > Managing this requires a lot of effort and things get very complex.
> F.e.
> > > when a KStream has a null value and is aggregated, you first need to
> > > convert it into some optional value instead b/c aggregates ignore
> nulls.
> > >
> > > Is there a better way or a way that does not impact all the existing
> > > streaming logic?
> > >
> > > I was thinking about having an out-of-bound process that sends null
> > values
> > > to all topics with the correct keys. I could then filter out all null
> > > values before doing the rest of the existing stream logic.
> > > Would that make sense?
> > >
> > > I can send null values to all my topics, but how do I get the changelog
> > > topics created by kafka-streams. And what about the state store?
> > >
> > > Best regards
> > > Jan
> > >
> >
>

Re: complicated logic for tombstone records

Posted by Alex Brekken <br...@gmail.com>.
Hi Jan, unfortunately there is no easy or automatic way to do this.
Publishing null values directly to the changelog topics will remove them
from the topic, but it won't remove the corresponding row from the RocksDB
state store.  (though deleting data programmatically from a state-store
WILL also remove it from the changelog topic)  Given that you want to
completely remove the data for a given set of keys, your best option might
be to modify your topology to handle null messages so that they can get
deleted from your aggregations. (and publish those from an outside app)
Hopefully this isn't too self-serving, but I actually wrote a blog post
about managing state-store data not long ago:
https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/.
Hopefully that might give you some ideas.

Alex

On Thu, Jan 2, 2020 at 4:11 PM Boyang Chen <re...@gmail.com>
wrote:

> Hey Jan,
>
> although I believe your case is much more complicated, but would time based
> retention work for you at all? If yes, time window store is like the best
> option.
>
> If no, streams has no out-of-box solution for invalidating the aggregation
> record. It seems at least we could provide an API to inject
> tombstone records for aggregation logic
> so that they don't get ignored eventually. This sounds like a good future
> work.
>
> Boyang
>
> On Thu, Jan 2, 2020 at 1:47 PM Jan Bols <ja...@telenet.be> wrote:
>
> > Hi,
> > I have a rather complicated kafka streams application involving multiple
> > joins, aggregates, maps etc. At a certain point, parts of the data needs
> to
> > be removed throughout the entire streams topology, both in the topics,
> the
> > changelogs and the rocksdb state stores.
> >
> > Managing this requires a lot of effort and things get very complex. F.e.
> > when a KStream has a null value and is aggregated, you first need to
> > convert it into some optional value instead b/c aggregates ignore nulls.
> >
> > Is there a better way or a way that does not impact all the existing
> > streaming logic?
> >
> > I was thinking about having an out-of-bound process that sends null
> values
> > to all topics with the correct keys. I could then filter out all null
> > values before doing the rest of the existing stream logic.
> > Would that make sense?
> >
> > I can send null values to all my topics, but how do I get the changelog
> > topics created by kafka-streams. And what about the state store?
> >
> > Best regards
> > Jan
> >
>

Re: complicated logic for tombstone records

Posted by Boyang Chen <re...@gmail.com>.
Hey Jan,

although I believe your case is much more complicated, but would time based
retention work for you at all? If yes, time window store is like the best
option.

If no, streams has no out-of-box solution for invalidating the aggregation
record. It seems at least we could provide an API to inject
tombstone records for aggregation logic
so that they don't get ignored eventually. This sounds like a good future
work.

Boyang

On Thu, Jan 2, 2020 at 1:47 PM Jan Bols <ja...@telenet.be> wrote:

> Hi,
> I have a rather complicated kafka streams application involving multiple
> joins, aggregates, maps etc. At a certain point, parts of the data needs to
> be removed throughout the entire streams topology, both in the topics, the
> changelogs and the rocksdb state stores.
>
> Managing this requires a lot of effort and things get very complex. F.e.
> when a KStream has a null value and is aggregated, you first need to
> convert it into some optional value instead b/c aggregates ignore nulls.
>
> Is there a better way or a way that does not impact all the existing
> streaming logic?
>
> I was thinking about having an out-of-bound process that sends null values
> to all topics with the correct keys. I could then filter out all null
> values before doing the rest of the existing stream logic.
> Would that make sense?
>
> I can send null values to all my topics, but how do I get the changelog
> topics created by kafka-streams. And what about the state store?
>
> Best regards
> Jan
>