You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Marasoiu, Nicu" <ni...@metrosystems.net> on 2018/02/27 09:03:32 UTC

question on doing deduplication with KafkaStreams

Hi,
From a programatic perspective, doing a groupByKey.reduce((val1, val2) -> val1) would deduplicate entries, but then I have a few questions: this state would accumulate without limit, right? Should we do a windowing, to eliminate old records be needed, right? Will the state accumulate just in the RocksDB instance, not in memory, right?
Thank you,
Nicu
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.


Re: question on doing deduplication with KafkaStreams

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Nicu,

To eliminate old records you'll want to add an  `until(final long
durationMs)` method to your existing code like so:

stream.groupByKey(Serdes.String(), Serdes.String())
                .windowedBy(SessionWindows.with(TimeUnit.HOURS.toMillis(1)).
*until(TimeUnit.HOURS.toMillis(1))*)
                .reduce((val1, val2) -> val1)

HTH

-Bill

On Tue, Feb 27, 2018 at 8:26 AM, Marasoiu, Nicu <
nicu.marasoiu@metrosystems.net> wrote:

> Hi,
> In the code below I am attempting at deduplicating within one hour, any
> duplicates further apart would remain duplicates. Does this look good to
> you? Thanks
>
> stream.groupByKey(Serdes.String(), Serdes.String())
>                 .windowedBy(SessionWindows.with(TimeUnit.HOURS.toMillis(
> 1)))
>                 .reduce((val1, val2) -> val1)
>
> ________________________________________
> From: Marasoiu, Nicu [nicu.marasoiu@metrosystems.net]
> Sent: Tuesday, February 27, 2018 11:03 AM
> To: users@kafka.apache.org
> Subject: question on doing deduplication with KafkaStreams
>
> Hi,
> From a programatic perspective, doing a groupByKey.reduce((val1, val2) ->
> val1) would deduplicate entries, but then I have a few questions: this
> state would accumulate without limit, right? Should we do a windowing, to
> eliminate old records be needed, right? Will the state accumulate just in
> the RocksDB instance, not in memory, right?
> Thank you,
> Nicu
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
> 40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
> Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
> Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
> ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
> rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
> der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
> sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
> und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
> informieren Sie bitte unverzüglich den Absender und vernichten Sie die
> E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the
> named addressee. They may contain confidential information which may also
> be protected by professional secrecy. Unless you are the named addressee
> (or authorised to receive for the addressee) you may not copy or use this
> message or any attachment or disclose the contents to anyone else. If this
> e-mail was sent to you by mistake please notify the sender immediately and
> delete this e-mail.
>
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
> 40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
> Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
> Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
> ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
> rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
> der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
> sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
> und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
> informieren Sie bitte unverzüglich den Absender und vernichten Sie die
> E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the
> named addressee. They may contain confidential information which may also
> be protected by professional secrecy. Unless you are the named addressee
> (or authorised to receive for the addressee) you may not copy or use this
> message or any attachment or disclose the contents to anyone else. If this
> e-mail was sent to you by mistake please notify the sender immediately and
> delete this e-mail.
>
>

RE: question on doing deduplication with KafkaStreams

Posted by "Marasoiu, Nicu" <ni...@metrosystems.net>.
Hi,
In the code below I am attempting at deduplicating within one hour, any duplicates further apart would remain duplicates. Does this look good to you? Thanks

stream.groupByKey(Serdes.String(), Serdes.String())
                .windowedBy(SessionWindows.with(TimeUnit.HOURS.toMillis(1)))
                .reduce((val1, val2) -> val1)

________________________________________
From: Marasoiu, Nicu [nicu.marasoiu@metrosystems.net]
Sent: Tuesday, February 27, 2018 11:03 AM
To: users@kafka.apache.org
Subject: question on doing deduplication with KafkaStreams

Hi,
From a programatic perspective, doing a groupByKey.reduce((val1, val2) -> val1) would deduplicate entries, but then I have a few questions: this state would accumulate without limit, right? Should we do a windowing, to eliminate old records be needed, right? Will the state accumulate just in the RocksDB instance, not in memory, right?
Thank you,
Nicu
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.

Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.