You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Viliam Durina <vi...@hazelcast.com> on 2019/06/06 08:56:24 UTC

Continuing of Kafka producer transaction after producer restart

The aim of having transactional.id configured for the producer is, in
my understanding, to fence off a zombie producer and to proactively
abort its transactions to avoid the need to wait for a timeout.

What I'm interested in doing is to be able to continue the
transaction. For example:

producer.beginTransaction();
producer.send(...);
// now the producer crashes

// start a new producer with same transactional.id
producer = new KafkaProducer(...);
// this aborts the unfinished transaction of the previous producer
producer.initTransactions();

My gut feeling is that it should be technically possible, there's just
no API for that. Is there anything that prevents us from doing that?

Why do I need this? The Kafka transaction in my case is a part of a
larger distributed transaction, which failed during the 2nd phase. The
transaction coordinator saved its state and was restarted and knows
that some participants might have committed and some not, therefore it
requires all participants to finish the commit from previous run.

Viliam

Re: Continuing of Kafka producer transaction after producer restart

Posted by Viliam Durina <vi...@hazelcast.com>.
It's hacked in Apache Flink using reflection:
https://github.com/apache/flink/blob/c7bf460b15ff1501f1d0ffa24ad5a074032bc503/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L138-L164
It would be nice to have this feature supported in the API.

Viliam

On Thu, 6 Jun 2019 at 10:56, Viliam Durina <vi...@hazelcast.com> wrote:
>
> The aim of having transactional.id configured for the producer is, in
> my understanding, to fence off a zombie producer and to proactively
> abort its transactions to avoid the need to wait for a timeout.
>
> What I'm interested in doing is to be able to continue the
> transaction. For example:
>
> producer.beginTransaction();
> producer.send(...);
> // now the producer crashes
>
> // start a new producer with same transactional.id
> producer = new KafkaProducer(...);
> // this aborts the unfinished transaction of the previous producer
> producer.initTransactions();
>
> My gut feeling is that it should be technically possible, there's just
> no API for that. Is there anything that prevents us from doing that?
>
> Why do I need this? The Kafka transaction in my case is a part of a
> larger distributed transaction, which failed during the 2nd phase. The
> transaction coordinator saved its state and was restarted and knows
> that some participants might have committed and some not, therefore it
> requires all participants to finish the commit from previous run.
>
> Viliam