You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Marasoiu, Nicu" <ni...@metrosystems.net> on 2018/02/21 10:57:24 UTC

commiting consumed offsets synchronously (every message)

Hi,
In order to obtain an exactly-once semantics, we are thinking of doing at-least-once processing, and then have a compensation mechanism to fix the results in few minutes by correcting them by substracting the effects of the duplicates. However, in order to do that, it seems that at least this compensation mechanism needs to read from a topic and commit offsets every message, so that when failover happens, it would not interpret as duplicates the events from the latest commit until present. What are the performance implications of this, and what advice would you have for exactly-once behavior (at least with controllable error)?
Thank you,
Nicu Marasoiu
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.


Re: commiting consumed offsets synchronously (every message)

Posted by Sönke Liebau <so...@opencore.com.INVALID>.
Kafka Streams would enable exactly once processing, yes. But this only
holds true as long as your data stays in Kafka topics, as soon as you
want to write data to an external system the exactly once guarantees
don't hold true any more and you end up with the same issues - so I
suspect that his would only move your issues to a later date. The same
goes for Kafka Connect, without implementing a proper 2 phase commit
protocol I don't think there is any way that true exactly once
processing from Kafka to an external system is possible.

I don't think that there would be a large performance benefit when
using Streams as I assume (have never checked though, maybe someone
else can chime in here) that Streams internally uses the same Java
objects for reading from a topic - if anything the transaction
overhead from exactly once processing might actually slow it down even
more.

If you are writing to a traditional database something like this might
bring you closer to your target:

1. read from kafka
2. start transaction in db
3. update whatever your target table is
4. record unique id for the record in a "processed" table
5. commit transaction
6. commit offset to Kafka

You could do this for batches as well, there is not strictly speaking
a need to limit yourself to one record - though you need to ensure
that you roll back the entire transaction if one record fails. This
way you could use the "processed" table to check whether a record was
already processed if your job fails between steps 5 and 6.

Best regards,
Sönke

On Wed, Feb 21, 2018 at 4:26 PM, Marasoiu, Nicu
<ni...@metrosystems.net> wrote:
> Thank you very much,
> Would you think that Kafka-Streams with exactly_once flag enabled would perform better than kafka client with individual commit per message as timed below? Perhaps the implementation of exactly-once read-process-write is using other methods and its performance is better.
> Indeed, incrementing a counter per processed message key in our database would be one way of accounting for duplicate processing, but I am not sure how can I do this in an efficient way (not querying all table).
> Until now I concentrated on accounting for duplicate keys in topics via a kafka-streams job. That might be enough only if we transform a code that we have for main business logic in a pure function, and create the effect of writing to the database via a kafka connector. Since I understand both streams and connectors support exactly once, it would be a possibility to eliminate the possibility of duplicate processing downstream of a topic.
>
> Thanks for your help,
> Nicu
> ________________________________________
> From: Sönke Liebau [soenke.liebau@opencore.com.INVALID]
> Sent: Wednesday, February 21, 2018 4:59 PM
> To: users@kafka.apache.org
> Subject: Re: commiting consumed offsets synchronously (every message)
>
> Hi Nicu,
>
> committing after every message and thus retrieving them with a batch size
> of 1 will definitely make a huge difference in performance!
> I've rigged a quick (and totally non academic) test which came up with the
> following numbers:
>
> Batching consumer - Consumed 1000490 records in 5 seconds
> Non Batching, commiting consumer - Consumed 1000000 records in 3023 seconds
>
> The first line was a consumer with default settings and auto.offset.commit,
> the second one retrieved messages one per poll and called commitSync after
> every message.
>
>
> I am not sure if you actually need this though, wouldn't your deduplication
> process be able to check the downstream system, whether that specific
> message was already processed and use that to identify duplicates?
> Or are you not sending the actual records downstream but just doing
> something like summing, counting, ... them?
>
> It's tough to be more specific without knowing more specifics, but maybe
> that helps a bit already?
>
> Best regards,
> Sönke
>
> On Wed, Feb 21, 2018 at 11:57 AM, Marasoiu, Nicu <
> nicu.marasoiu@metrosystems.net> wrote:
>> Hi,
>> In order to obtain an exactly-once semantics, we are thinking of doing
> at-least-once processing, and then have a compensation mechanism to fix the
> results in few minutes by correcting them by substracting the effects of
> the duplicates. However, in order to do that, it seems that at least this
> compensation mechanism needs to read from a topic and commit offsets every
> message, so that when failover happens, it would not interpret as
> duplicates the events from the latest commit until present. What are the
> performance implications of this, and what advice would you have for
> exactly-once behavior (at least with controllable error)?
>> Thank you,
>> Nicu Marasoiu
>> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
> 40235 Düsseldorf, Germany
>> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
>> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
> Wim van Herwijnen
>> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
> Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>>
>> Betreffend Mails von *@metrosystems.net
>> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
> ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
> rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
> der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
> sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
> und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
> informieren Sie bitte unverzüglich den Absender und vernichten Sie die
> E-Mail.
>>
>> Regarding mails from *@metrosystems.net
>> This e-mail message and any attachment are intended exclusively for the
> named addressee. They may contain confidential information which may also
> be protected by professional secrecy. Unless you are the named addressee
> (or authorised to receive for the addressee) you may not copy or use this
> message or any attachment or disclose the contents to anyone else. If this
> e-mail was sent to you by mistake please notify the sender immediately and
> delete this e-mail.
>>
>
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.
>



-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany

RE: commiting consumed offsets synchronously (every message)

Posted by "Marasoiu, Nicu" <ni...@metrosystems.net>.
Thank you very much,
Would you think that Kafka-Streams with exactly_once flag enabled would perform better than kafka client with individual commit per message as timed below? Perhaps the implementation of exactly-once read-process-write is using other methods and its performance is better.
Indeed, incrementing a counter per processed message key in our database would be one way of accounting for duplicate processing, but I am not sure how can I do this in an efficient way (not querying all table).
Until now I concentrated on accounting for duplicate keys in topics via a kafka-streams job. That might be enough only if we transform a code that we have for main business logic in a pure function, and create the effect of writing to the database via a kafka connector. Since I understand both streams and connectors support exactly once, it would be a possibility to eliminate the possibility of duplicate processing downstream of a topic.

Thanks for your help,
Nicu
________________________________________
From: Sönke Liebau [soenke.liebau@opencore.com.INVALID]
Sent: Wednesday, February 21, 2018 4:59 PM
To: users@kafka.apache.org
Subject: Re: commiting consumed offsets synchronously (every message)

Hi Nicu,

committing after every message and thus retrieving them with a batch size
of 1 will definitely make a huge difference in performance!
I've rigged a quick (and totally non academic) test which came up with the
following numbers:

Batching consumer - Consumed 1000490 records in 5 seconds
Non Batching, commiting consumer - Consumed 1000000 records in 3023 seconds

The first line was a consumer with default settings and auto.offset.commit,
the second one retrieved messages one per poll and called commitSync after
every message.


I am not sure if you actually need this though, wouldn't your deduplication
process be able to check the downstream system, whether that specific
message was already processed and use that to identify duplicates?
Or are you not sending the actual records downstream but just doing
something like summing, counting, ... them?

It's tough to be more specific without knowing more specifics, but maybe
that helps a bit already?

Best regards,
Sönke

On Wed, Feb 21, 2018 at 11:57 AM, Marasoiu, Nicu <
nicu.marasoiu@metrosystems.net> wrote:
> Hi,
> In order to obtain an exactly-once semantics, we are thinking of doing
at-least-once processing, and then have a compensation mechanism to fix the
results in few minutes by correcting them by substracting the effects of
the duplicates. However, in order to do that, it seems that at least this
compensation mechanism needs to read from a topic and commit offsets every
message, so that when failover happens, it would not interpret as
duplicates the events from the latest commit until present. What are the
performance implications of this, and what advice would you have for
exactly-once behavior (at least with controllable error)?
> Thank you,
> Nicu Marasoiu
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
informieren Sie bitte unverzüglich den Absender und vernichten Sie die
E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the
named addressee. They may contain confidential information which may also
be protected by professional secrecy. Unless you are the named addressee
(or authorised to receive for the addressee) you may not copy or use this
message or any attachment or disclose the contents to anyone else. If this
e-mail was sent to you by mistake please notify the sender immediately and
delete this e-mail.
>



--
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.


Re: commiting consumed offsets synchronously (every message)

Posted by Sönke Liebau <so...@opencore.com.INVALID>.
Hi Nicu,

committing after every message and thus retrieving them with a batch size
of 1 will definitely make a huge difference in performance!
I've rigged a quick (and totally non academic) test which came up with the
following numbers:

Batching consumer - Consumed 1000490 records in 5 seconds
Non Batching, commiting consumer - Consumed 1000000 records in 3023 seconds

The first line was a consumer with default settings and auto.offset.commit,
the second one retrieved messages one per poll and called commitSync after
every message.


I am not sure if you actually need this though, wouldn't your deduplication
process be able to check the downstream system, whether that specific
message was already processed and use that to identify duplicates?
Or are you not sending the actual records downstream but just doing
something like summing, counting, ... them?

It's tough to be more specific without knowing more specifics, but maybe
that helps a bit already?

Best regards,
Sönke

On Wed, Feb 21, 2018 at 11:57 AM, Marasoiu, Nicu <
nicu.marasoiu@metrosystems.net> wrote:
> Hi,
> In order to obtain an exactly-once semantics, we are thinking of doing
at-least-once processing, and then have a compensation mechanism to fix the
results in few minutes by correcting them by substracting the effects of
the duplicates. However, in order to do that, it seems that at least this
compensation mechanism needs to read from a topic and commit offsets every
message, so that when failover happens, it would not interpret as
duplicates the events from the latest commit until present. What are the
performance implications of this, and what advice would you have for
exactly-once behavior (at least with controllable error)?
> Thank you,
> Nicu Marasoiu
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
informieren Sie bitte unverzüglich den Absender und vernichten Sie die
E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the
named addressee. They may contain confidential information which may also
be protected by professional secrecy. Unless you are the named addressee
(or authorised to receive for the addressee) you may not copy or use this
message or any attachment or disclose the contents to anyone else. If this
e-mail was sent to you by mistake please notify the sender immediately and
delete this e-mail.
>



-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany