You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vojtech Juranek <vj...@redhat.com> on 2023/06/08 11:53:16 UTC

Kafka Connect exactly-once semantic and very large transactions

Hi,
I'm investigating possibilities of exactly-once semantic for Debezium [1] 
Kafka Connect source connectors, which implements change data capture for 
various databases. Debezium has two phases, initial snapshot phase and 
streaming phase. Initial snapshot phase loads existing data from the database 
and send it to the Kafka, subsequent streaming phase captures any changes to 
the data.

Exactly-once delivery seems to work really well during the streaming phase. 
Now, I'm investigating how to ensure exactly-once delivery for initial 
snapshot phase.  If the snapshot fails (e.g. due to DB connection drop or 
worker node crash), we force new snapshot after the restart as the data may 
change during the restart and the snapshot has to reflect the state of the data 
in time when it was executed. However, re-taking the snapshot produces 
duplicate records in the Kafka related topics. 

Probably the most easy solution to this issue is to run the whole snapshot in 
a single Kafka transaction. This may result into a huge transaction, 
containing millions of records, in some cases even billions of records. As 
these records cannot be consumed until transaction is committed and therefore 
logs cannot be compacted, this would potentially result in huge increase of 
Kafka logs. Also, as for the large DBs this is time consuming process, it 
would very likely result in transaction timeouts (unless the timeout is set to 
very large value).

Is my understanding of the impact of very large transactions correct? Are 
there any other drawbacks I'm missing (e.g. can it also result in some memory 
issue or something similar)?

Thanks in advanced!
Vojta

[1] https://debezium.io/

Re: Kafka Connect exactly-once semantic and very large transactions

Posted by Vojtech Juranek <vj...@redhat.com>.
Hi Chris,
thanks for your response!

Yes, we are looking also on other means how to enable exactly-once semantics 
for existing data (e.g. using incremental snapshot which snapshots the data 
incrementally and in smaller chunks), but first we would like to fully 
understand all the implications and consequences for Kafka, so if someone want 
to provide more insights or recommendations not mentioned in your response, it 
would be still very welcome.

Thanks!
Vojta

On Thursday, 8 June 2023 15:58:37 CEST Chris Egerton wrote:
> Hi Vojta,
> 
> From my limited understanding of the Debezium snapshot process, I believe
> that you're correct that producing the entire snapshot in a transaction is
> the way to provide exactly-once semantics during that phase. If there's a
> way to recover in-progress snapshots and skip over already-produced
> records, then that could be a suitable alternative.
> 
> You're correct that a large transaction timeout may be required to
> accommodate this case (we even try to call this out in the error message
> that users see on transaction timeouts [1]). I'm not very familiar with
> broker logic but with my limited understanding, your assessment of the
> impact of delayed log compaction also seems valid.
> 
> The only other issue that comes to my mind is that latency will be higher
> for downstream consumers since they won't be able to read any records until
> the entire transaction is complete, assuming they're using the
> read_committed isolation level. But given that this is the snapshotting
> phase and you're presumably moving historical data instead of real-time
> updates to your database, this should hopefully be acceptable for most
> users.
> 
> I'd be interested to hear what someone more familiar with client and broker
> internals has to say! Going to be following this thread.
> 
> [1] -
> https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba0
> 9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnce
> WorkerSourceTask.java#L357
> 
> Cheers,
> 
> Chris
> 
> On Thu, Jun 8, 2023 at 7:53 AM Vojtech Juranek <vj...@redhat.com> wrote:
> > Hi,
> > I'm investigating possibilities of exactly-once semantic for Debezium [1]
> > Kafka Connect source connectors, which implements change data capture for
> > various databases. Debezium has two phases, initial snapshot phase and
> > streaming phase. Initial snapshot phase loads existing data from the
> > database
> > and send it to the Kafka, subsequent streaming phase captures any changes
> > to
> > the data.
> > 
> > Exactly-once delivery seems to work really well during the streaming
> > phase.
> > Now, I'm investigating how to ensure exactly-once delivery for initial
> > snapshot phase.  If the snapshot fails (e.g. due to DB connection drop or
> > worker node crash), we force new snapshot after the restart as the data
> > may
> > change during the restart and the snapshot has to reflect the state of the
> > data
> > in time when it was executed. However, re-taking the snapshot produces
> > duplicate records in the Kafka related topics.
> > 
> > Probably the most easy solution to this issue is to run the whole snapshot
> > in
> > a single Kafka transaction. This may result into a huge transaction,
> > containing millions of records, in some cases even billions of records. As
> > these records cannot be consumed until transaction is committed and
> > therefore
> > logs cannot be compacted, this would potentially result in huge increase
> > of
> > Kafka logs. Also, as for the large DBs this is time consuming process, it
> > would very likely result in transaction timeouts (unless the timeout is
> > set to
> > very large value).
> > 
> > Is my understanding of the impact of very large transactions correct? Are
> > there any other drawbacks I'm missing (e.g. can it also result in some
> > memory
> > issue or something similar)?
> > 
> > Thanks in advanced!
> > Vojta
> > 
> > [1] https://debezium.io/


Re: Kafka Connect exactly-once semantic and very large transactions

Posted by Chris Egerton <fe...@gmail.com>.
Hi Vojta,

From my limited understanding of the Debezium snapshot process, I believe
that you're correct that producing the entire snapshot in a transaction is
the way to provide exactly-once semantics during that phase. If there's a
way to recover in-progress snapshots and skip over already-produced
records, then that could be a suitable alternative.

You're correct that a large transaction timeout may be required to
accommodate this case (we even try to call this out in the error message
that users see on transaction timeouts [1]). I'm not very familiar with
broker logic but with my limited understanding, your assessment of the
impact of delayed log compaction also seems valid.

The only other issue that comes to my mind is that latency will be higher
for downstream consumers since they won't be able to read any records until
the entire transaction is complete, assuming they're using the
read_committed isolation level. But given that this is the snapshotting
phase and you're presumably moving historical data instead of real-time
updates to your database, this should hopefully be acceptable for most
users.

I'd be interested to hear what someone more familiar with client and broker
internals has to say! Going to be following this thread.

[1] -
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L357

Cheers,

Chris

On Thu, Jun 8, 2023 at 7:53 AM Vojtech Juranek <vj...@redhat.com> wrote:

> Hi,
> I'm investigating possibilities of exactly-once semantic for Debezium [1]
> Kafka Connect source connectors, which implements change data capture for
> various databases. Debezium has two phases, initial snapshot phase and
> streaming phase. Initial snapshot phase loads existing data from the
> database
> and send it to the Kafka, subsequent streaming phase captures any changes
> to
> the data.
>
> Exactly-once delivery seems to work really well during the streaming
> phase.
> Now, I'm investigating how to ensure exactly-once delivery for initial
> snapshot phase.  If the snapshot fails (e.g. due to DB connection drop or
> worker node crash), we force new snapshot after the restart as the data
> may
> change during the restart and the snapshot has to reflect the state of the
> data
> in time when it was executed. However, re-taking the snapshot produces
> duplicate records in the Kafka related topics.
>
> Probably the most easy solution to this issue is to run the whole snapshot
> in
> a single Kafka transaction. This may result into a huge transaction,
> containing millions of records, in some cases even billions of records. As
> these records cannot be consumed until transaction is committed and
> therefore
> logs cannot be compacted, this would potentially result in huge increase
> of
> Kafka logs. Also, as for the large DBs this is time consuming process, it
> would very likely result in transaction timeouts (unless the timeout is
> set to
> very large value).
>
> Is my understanding of the impact of very large transactions correct? Are
> there any other drawbacks I'm missing (e.g. can it also result in some
> memory
> issue or something similar)?
>
> Thanks in advanced!
> Vojta
>
> [1] https://debezium.io/