You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2019/12/10 13:04:41 UTC

Is there a way to prevent duplicate messages to downstream

Hi,
I am using streams and I get messages like: (K, V)
(A, a), (B, b), (C, c), (A, a), (C, c), (A, a) .....
I wanted to define a topology which would filter out duplicate messages
from upstream.

I want to know if this is possible?
The code I have written to do this is something like this:

source.groupBy((k, v) -> new Key(k, v))
      .reduce((av, nv) -> nv)
      .toStream()

So basically I create a new key which is combination of existing (k,v).
Then I group by it and reduce it to a table to just store the final value.
Finally I convert that to a stream to be used downstream.

My question is is that would this logic work?
Like if I get another message (A, a) it will basically replace the existing
(A, a) in the table and no new message would get appended to the resulting
stream.

Is my understanding correct?

If not then is there any other way to achieve this?

Thanks
Sachin

Re: Is there a way to prevent duplicate messages to downstream

Posted by Alex Brekken <br...@gmail.com>.
I've never used that dedup transformer before, but what you've got looks
right. (though if there's a way to hash your message value, or somehow get
a guid out of it that might be preferable)  As you probably noticed it's
state is Windowed - so if your use-case depends on being able to remove
duplicate events over a large (or infinite) time-span, than this solution
probably isn't a good fit.

Alex

On Tue, Dec 10, 2019 at 8:40 AM Sachin Mittal <sj...@gmail.com> wrote:

> Hi Alex,
> Thanks for the quick response.
> What I have is around 8 streams branched from a single stream, that down
> the line again gets joined into 1.
> Now each branched stream can have duplicates and when joining all this data
> I just have kind of endless tuples of data.
>
> So what I was thinking what if I can actually remove all the duplicates
> right at start then I will have manageable data to do the joins.
>
> So I checked the code, and wanted to know how can this be inserted into
> existing pipeline. Basically my current code was something like this:
> Properties props = new Properties();
> ....
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<K, V> input = builder.stream("input-topic");
> input.... //my pipeline starts
> .....
> final Topology topology = builder.build(props);
> final KafkaStreams streams = new KafkaStreams(topology, props);
> ......
> streams.start();
>
> This will change to:
> Properties props = new Properties();
> ....
> final StoreBuilder<WindowStore<K, V>> dedupStoreBuilder = .....
> .....
> final KStream<K, V> input = builder.stream("input-topic");
> final KStream<K, V> deduplicated = input.transform(() -> new
> DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value));
> deduplicated.... //my existing pipeline
> ..... //rest same as before
> streams.start();
>
> Let me know if I got this right.
>
> Thanks
> Sachin
>
>
> On Tue, Dec 10, 2019 at 6:59 PM Alex Brekken <br...@gmail.com> wrote:
>
> > Hi Sachin, is your goal to prevent any records with a duplicate key from
> > ever getting sent downstream?  The KTable you have in your example will
> of
> > course have the most recent record for a given key, but it will still
> emit
> > updates.  So if key "A" arrives a second time (with no change to the
> > value), it will still emitted. (depending on how rapidly you get
> duplicate
> > events, some might get removed by internal caching but you will still
> > likely get at least 1 of those duplicates sent further downstream through
> > the topology)  Take a look at this example from Confluent to see if it
> > would work for your case:
> >
> >
> https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
> > .
> >
> >
> > Also, what is the reason for wanting to eliminate duplicates?  Do you
> have
> > downstream aggregators that you don't want to incorrectly count
> duplicated
> > events?
> >
> > Alex
> >
> > On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sj...@gmail.com>
> wrote:
> >
> > > Hi,
> > > I am using streams and I get messages like: (K, V)
> > > (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) .....
> > > I wanted to define a topology which would filter out duplicate messages
> > > from upstream.
> > >
> > > I want to know if this is possible?
> > > The code I have written to do this is something like this:
> > >
> > > source.groupBy((k, v) -> new Key(k, v))
> > >       .reduce((av, nv) -> nv)
> > >       .toStream()
> > >
> > > So basically I create a new key which is combination of existing (k,v).
> > > Then I group by it and reduce it to a table to just store the final
> > value.
> > > Finally I convert that to a stream to be used downstream.
> > >
> > > My question is is that would this logic work?
> > > Like if I get another message (A, a) it will basically replace the
> > existing
> > > (A, a) in the table and no new message would get appended to the
> > resulting
> > > stream.
> > >
> > > Is my understanding correct?
> > >
> > > If not then is there any other way to achieve this?
> > >
> > > Thanks
> > > Sachin
> > >
> >
>

Re: Is there a way to prevent duplicate messages to downstream

Posted by Sachin Mittal <sj...@gmail.com>.
Hi Alex,
Thanks for the quick response.
What I have is around 8 streams branched from a single stream, that down
the line again gets joined into 1.
Now each branched stream can have duplicates and when joining all this data
I just have kind of endless tuples of data.

So what I was thinking what if I can actually remove all the duplicates
right at start then I will have manageable data to do the joins.

So I checked the code, and wanted to know how can this be inserted into
existing pipeline. Basically my current code was something like this:
Properties props = new Properties();
....
final StreamsBuilder builder = new StreamsBuilder();
final KStream<K, V> input = builder.stream("input-topic");
input.... //my pipeline starts
.....
final Topology topology = builder.build(props);
final KafkaStreams streams = new KafkaStreams(topology, props);
......
streams.start();

This will change to:
Properties props = new Properties();
....
final StoreBuilder<WindowStore<K, V>> dedupStoreBuilder = .....
.....
final KStream<K, V> input = builder.stream("input-topic");
final KStream<K, V> deduplicated = input.transform(() -> new
DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value));
deduplicated.... //my existing pipeline
..... //rest same as before
streams.start();

Let me know if I got this right.

Thanks
Sachin


On Tue, Dec 10, 2019 at 6:59 PM Alex Brekken <br...@gmail.com> wrote:

> Hi Sachin, is your goal to prevent any records with a duplicate key from
> ever getting sent downstream?  The KTable you have in your example will of
> course have the most recent record for a given key, but it will still emit
> updates.  So if key "A" arrives a second time (with no change to the
> value), it will still emitted. (depending on how rapidly you get duplicate
> events, some might get removed by internal caching but you will still
> likely get at least 1 of those duplicates sent further downstream through
> the topology)  Take a look at this example from Confluent to see if it
> would work for your case:
>
> https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
> .
>
>
> Also, what is the reason for wanting to eliminate duplicates?  Do you have
> downstream aggregators that you don't want to incorrectly count duplicated
> events?
>
> Alex
>
> On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi,
> > I am using streams and I get messages like: (K, V)
> > (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) .....
> > I wanted to define a topology which would filter out duplicate messages
> > from upstream.
> >
> > I want to know if this is possible?
> > The code I have written to do this is something like this:
> >
> > source.groupBy((k, v) -> new Key(k, v))
> >       .reduce((av, nv) -> nv)
> >       .toStream()
> >
> > So basically I create a new key which is combination of existing (k,v).
> > Then I group by it and reduce it to a table to just store the final
> value.
> > Finally I convert that to a stream to be used downstream.
> >
> > My question is is that would this logic work?
> > Like if I get another message (A, a) it will basically replace the
> existing
> > (A, a) in the table and no new message would get appended to the
> resulting
> > stream.
> >
> > Is my understanding correct?
> >
> > If not then is there any other way to achieve this?
> >
> > Thanks
> > Sachin
> >
>

Re: Is there a way to prevent duplicate messages to downstream

Posted by Alex Brekken <br...@gmail.com>.
Hi Sachin, is your goal to prevent any records with a duplicate key from
ever getting sent downstream?  The KTable you have in your example will of
course have the most recent record for a given key, but it will still emit
updates.  So if key "A" arrives a second time (with no change to the
value), it will still emitted. (depending on how rapidly you get duplicate
events, some might get removed by internal caching but you will still
likely get at least 1 of those duplicates sent further downstream through
the topology)  Take a look at this example from Confluent to see if it
would work for your case:
https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java.


Also, what is the reason for wanting to eliminate duplicates?  Do you have
downstream aggregators that you don't want to incorrectly count duplicated
events?

Alex

On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> I am using streams and I get messages like: (K, V)
> (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) .....
> I wanted to define a topology which would filter out duplicate messages
> from upstream.
>
> I want to know if this is possible?
> The code I have written to do this is something like this:
>
> source.groupBy((k, v) -> new Key(k, v))
>       .reduce((av, nv) -> nv)
>       .toStream()
>
> So basically I create a new key which is combination of existing (k,v).
> Then I group by it and reduce it to a table to just store the final value.
> Finally I convert that to a stream to be used downstream.
>
> My question is is that would this logic work?
> Like if I get another message (A, a) it will basically replace the existing
> (A, a) in the table and no new message would get appended to the resulting
> stream.
>
> Is my understanding correct?
>
> If not then is there any other way to achieve this?
>
> Thanks
> Sachin
>