You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Marasoiu, Nicu" <ni...@metrosystems.net> on 2018/03/12 06:58:47 UTC

transactional behavior offsets+effects

Hi,
We would consider one of 2 or 3 flows to ensure an "exactly once" process from an input kafka topic to a database storing results (using kafka consumer, but also evaluated kafka streams and details at the end) and wanted to gather your input on them:
(for simplicity let's assume that any exception exits the process except if the exception comes out of step 5)
The outlined flows are executed in a loop.

First flow/solution:
1. read from kafka
2. start transaction in db
3. update target tables
4. commit transaction
5. commit offset to Kafka
6. if commit offset failed, attempt another transaction to revert the previous one in db. (compensate)

Solution 2 - offsets persisted in db in the same transaction, consumer reads from explicit offsets at init
If it is possible for the consumer to configure its offsets before starting to consume, then this flow would be possible:
0. at consumer process boot, read the latest offsets for partitions from db and configs consumer to start from those.
1. read from kafka (first read, from explicit offsets, the next polls just continue)
2. start transaction in db
3. update target tables
3'. update an "offsets" table, for consumer group and partition id
4. commit transaction (which includes offsets)

Solution 3 - If it would be possible to commit an explicit value of the offset to kafka for a (partition, consumer group), not just the current offset, but a previously saved one (at step 0), than another flow would be possible, with 4 and 5 reversed:
4. commit offset to Kafka
5. commit transaction
6. if commit transaction failed, attempt to commit the old offset back to kafka. (compensate). Exit or rewind the consumer.

Solution 4 - use Kafka Streams configured with exactly once. This seems to imply that the aggregates (the results of the processing), currently stored in the db, would also need to be duplicated in kafka as output topics & local Rocksdb instances. Since the data volume even on the aggregates is significant, we are exploring solutions close to exactly once which would not imply the cost of doubly storing the result "tables".

Do you see any other possibility? What do you suggest for improving the options above, or what is your advice?
Please advise,
Thank you,
Nicu
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.


Re: transactional behavior offsets+effects

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Nicu,

What you described sounds reasonable to me.

In fact, solution 1 would not perfectly work if you have a failure on your
db right after step 5 but before step 6, so to make the txn commit in Kafka
and the txn commit in your sink DB "an atomic operation" together, you need
to either encode the committed offsets into your db as a single operation
(note that if the db txn failed, you can still override the offset commit
in Kafka by calling commit() again with the the previous successful commit
value, i.e. to revert to the previous commit), or reversely make the db txn
reference part of your atomic kafka offset commit operation (assuming that
you can still revert that committed db txn if the offset commit fails and
cannot be re-tried). In either way, you inject the information of one
side's atomic operation into the other side's atomic operation. What you
described in solution 2 is the former case, and it is indeed how some JDBC
connectors under the Kafka Connect frameworks have been implemented to
support EOS.


Guozhang


On Mon, Mar 12, 2018 at 4:54 AM, Marasoiu, Nicu <
nicu.marasoiu@metrosystems.net> wrote:

> Hi,
> Indeed, solution 2 seems feasible using db transaction (e.g. Cassandra
> batch) to include an offset update.
> A sophisticated implementation is for instance under the hood of
> https://doc.akka.io/docs/akka-stream-kafka/current/consumer.
> htmlhttps://doc.akka.io/docs/akka-stream-kafka/current/consumer.html but
> a "manual" implementation just on top of consumer seems feasible:
> - on new partition during subscribe or rebalance, get the latest offset
> for partition from db and do consumer.seek on the partition to that offset
> (using only onPartitionsAssigned rebalancing callback passed to subscribe
> - does it include the initial partitions allocation to a consumer, at
> subscribe time?)
> - repeat
>   - poll a batch of messages from kafka
>   - compute the results
>   - update the db with results and offset in same transaction
>
> I would have a few questions:
> - does the plan sound ok to you?
> - is there a risk that messages coming from the same partition reach
> multiple consumers doing poll if a rebalancing moves a partition?
> - is it indeed sufficient to use onPartitionsAssigned and not
> onPartitionsRevoked (given we update offset in transaction with a batch of
> results only)
> - does onPartitionsAssigned cover the startup/subscribe phase - the
> initial partitions with which the consumer starts?
> - if we would have a batch of messages from a single partition, we could
> have smaller transactions - a way I think about is doing consumer.pause in
> onPartitionsAssigned, and in the main loop, iterate through partitions and
> resume one partition, do poll, process, then next partition, in rotation?
>
> Thank you,
> Nicu Marasoiu
> ________________________________
> From: Marasoiu, Nicu
> Sent: Monday, March 12, 2018 8:58 AM
> To: users@kafka.apache.org
> Subject: transactional behavior offsets+effects
>
> Hi,
> We would consider one of 2 or 3 flows to ensure an "exactly once" process
> from an input kafka topic to a database storing results (using kafka
> consumer, but also evaluated kafka streams and details at the end) and
> wanted to gather your input on them:
> (for simplicity let's assume that any exception exits the process except
> if the exception comes out of step 5)
> The outlined flows are executed in a loop.
>
> First flow/solution:
> 1. read from kafka
> 2. start transaction in db
> 3. update target tables
> 4. commit transaction
> 5. commit offset to Kafka
> 6. if commit offset failed, attempt another transaction to revert the
> previous one in db. (compensate)
>
> Solution 2 - offsets persisted in db in the same transaction, consumer
> reads from explicit offsets at init
> If it is possible for the consumer to configure its offsets before
> starting to consume, then this flow would be possible:
> 0. at consumer process boot, read the latest offsets for partitions from
> db and configs consumer to start from those.
> 1. read from kafka (first read, from explicit offsets, the next polls just
> continue)
> 2. start transaction in db
> 3. update target tables
> 3'. update an "offsets" table, for consumer group and partition id
> 4. commit transaction (which includes offsets)
>
> Solution 3 - If it would be possible to commit an explicit value of the
> offset to kafka for a (partition, consumer group), not just the current
> offset, but a previously saved one (at step 0), than another flow would be
> possible, with 4 and 5 reversed:
> 4. commit offset to Kafka
> 5. commit transaction
> 6. if commit transaction failed, attempt to commit the old offset back to
> kafka. (compensate). Exit or rewind the consumer.
>
> Solution 4 - use Kafka Streams configured with exactly once. This seems to
> imply that the aggregates (the results of the processing), currently stored
> in the db, would also need to be duplicated in kafka as output topics &
> local Rocksdb instances. Since the data volume even on the aggregates is
> significant, we are exploring solutions close to exactly once which would
> not imply the cost of doubly storing the result "tables".
>
> Do you see any other possibility? What do you suggest for improving the
> options above, or what is your advice?
> Please advise,
> Thank you,
> Nicu
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
> 40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
> Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
> Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
> ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
> rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
> der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
> sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
> und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
> informieren Sie bitte unverzüglich den Absender und vernichten Sie die
> E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the
> named addressee. They may contain confidential information which may also
> be protected by professional secrecy. Unless you are the named addressee
> (or authorised to receive for the addressee) you may not copy or use this
> message or any attachment or disclose the contents to anyone else. If this
> e-mail was sent to you by mistake please notify the sender immediately and
> delete this e-mail.
>
>


-- 
-- Guozhang

RE: transactional behavior offsets+effects

Posted by "Marasoiu, Nicu" <ni...@metrosystems.net>.
Hi,
Indeed, solution 2 seems feasible using db transaction (e.g. Cassandra batch) to include an offset update.
A sophisticated implementation is for instance under the hood of https://doc.akka.io/docs/akka-stream-kafka/current/consumer.htmlhttps://doc.akka.io/docs/akka-stream-kafka/current/consumer.html but a "manual" implementation just on top of consumer seems feasible:
- on new partition during subscribe or rebalance, get the latest offset for partition from db and do consumer.seek on the partition to that offset
(using only onPartitionsAssigned rebalancing callback passed to subscribe - does it include the initial partitions allocation to a consumer, at subscribe time?)
- repeat
  - poll a batch of messages from kafka
  - compute the results
  - update the db with results and offset in same transaction

I would have a few questions:
- does the plan sound ok to you?
- is there a risk that messages coming from the same partition reach multiple consumers doing poll if a rebalancing moves a partition?
- is it indeed sufficient to use onPartitionsAssigned and not onPartitionsRevoked (given we update offset in transaction with a batch of results only)
- does onPartitionsAssigned cover the startup/subscribe phase - the initial partitions with which the consumer starts?
- if we would have a batch of messages from a single partition, we could have smaller transactions - a way I think about is doing consumer.pause in onPartitionsAssigned, and in the main loop, iterate through partitions and resume one partition, do poll, process, then next partition, in rotation?

Thank you,
Nicu Marasoiu
________________________________
From: Marasoiu, Nicu
Sent: Monday, March 12, 2018 8:58 AM
To: users@kafka.apache.org
Subject: transactional behavior offsets+effects

Hi,
We would consider one of 2 or 3 flows to ensure an "exactly once" process from an input kafka topic to a database storing results (using kafka consumer, but also evaluated kafka streams and details at the end) and wanted to gather your input on them:
(for simplicity let's assume that any exception exits the process except if the exception comes out of step 5)
The outlined flows are executed in a loop.

First flow/solution:
1. read from kafka
2. start transaction in db
3. update target tables
4. commit transaction
5. commit offset to Kafka
6. if commit offset failed, attempt another transaction to revert the previous one in db. (compensate)

Solution 2 - offsets persisted in db in the same transaction, consumer reads from explicit offsets at init
If it is possible for the consumer to configure its offsets before starting to consume, then this flow would be possible:
0. at consumer process boot, read the latest offsets for partitions from db and configs consumer to start from those.
1. read from kafka (first read, from explicit offsets, the next polls just continue)
2. start transaction in db
3. update target tables
3'. update an "offsets" table, for consumer group and partition id
4. commit transaction (which includes offsets)

Solution 3 - If it would be possible to commit an explicit value of the offset to kafka for a (partition, consumer group), not just the current offset, but a previously saved one (at step 0), than another flow would be possible, with 4 and 5 reversed:
4. commit offset to Kafka
5. commit transaction
6. if commit transaction failed, attempt to commit the old offset back to kafka. (compensate). Exit or rewind the consumer.

Solution 4 - use Kafka Streams configured with exactly once. This seems to imply that the aggregates (the results of the processing), currently stored in the db, would also need to be duplicated in kafka as output topics & local Rocksdb instances. Since the data volume even on the aggregates is significant, we are exploring solutions close to exactly once which would not imply the cost of doubly storing the result "tables".

Do you see any other possibility? What do you suggest for improving the options above, or what is your advice?
Please advise,
Thank you,
Nicu
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named addressee. They may contain confidential information which may also be protected by professional secrecy. Unless you are the named addressee (or authorised to receive for the addressee) you may not copy or use this message or any attachment or disclose the contents to anyone else. If this e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.