You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gabriel Giussi <ga...@gmail.com> on 2022/06/02 12:44:13 UTC

Re: What role plays transactional.id after KIP-447?

"I think we may overlooked it in documentation to emphasize that, in case
1), it should not expect ProducerFencedException. If so, we can fix the
javadoc."

IMHO that would be nice, I'm reviewing an existing codebase where we were
only handling ProducerFencedException, because the javadoc and the method
signature is explicit only about that, and CommitFailedException is not
even referenced but falls under the general KafkaException.
I think this could happen in both sendOffsetsToTransaction and
commitTransaction right?

Thanks.

El mar, 31 may 2022 a las 14:49, Guozhang Wang (<wa...@gmail.com>)
escribió:

> The CommitFailedException should be expected, since the fencing happens at
> the consumer coordinator. I.e. we can only fence the consumer-producer pair
> by the consumer's generation, but we cannot do so since there's no other
> producer who has just grabbed the same txn.id and bumped the producer
> epoch.
>
> So to just clarify, when the zombie comes back, it could be fenced either
> when:
>
> 1) it tries to complete the ongoing transaction via `sendOffset`, in which
> it would see the CommitFailedException. The caller is then responsible to
> handle the thrown exception that indicates being fenced.
> 2) it tries to heartbeat in the background thread, and got an
> InvalidGeneration error code, in which it would trigger the
> onPartitionsLost. The callback impl class is then responsible to handle
> that case which indicates being fenced.
>
> I think we may overlooked it in documentation to emphasize that, in case
> 1), it should not expect ProducerFencedException. If so, we can fix the
> javadoc.
>
>
>
>
> On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi <ga...@gmail.com>
> wrote:
>
> > But there is no guarantee that the onPartitionsLost callback will be
> called
> > before a zombie producer coming back to life tries to continue with the
> > transaction, e.g. sending offsets or committing, so I should handle the
> > exception first and I could directly create a new producer there instead
> of
> > doing in the callback.
> > The curious part for me is that I was able to reproduce a case that
> > simulates a zombie producer that will try to send offsets after a
> rebalance
> > but instead of failing with a ProducerFencedException is failing with a
> > CommitFailedException with this message "Transaction offset Commit failed
> > due to consumer group metadata mismatch: Specified group generation id is
> > not valid.", which makes sense but is not even documented in the
> > KafkaProducer#sendOffsetsToTransaction.
> > Is this the expected behaviour or it should fail with a
> > ProducerFencedException when the generation.id is outdated?
> > The case I reproduced is like this
> > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
> > producer with transactional.id = "tid.123"
> > 2. Consumes message from partition 1 and sends it to another thread to be
> > consumed (so the poll thread is not blocked)
> > 3. Producer A begins a transaction, sends to output topic and gets
> blocked
> > (I'm using a lock here to simulate a long processing) before calling
> > sendOffsetsToTransaction
> > 4. Consumer B is created and gets assigned partition 1 (I'm using
> > CooperativeStickyAssignor) and creates a producer with transactional.id
> =
> > "tid.456"
> > 5. Consumer B fetches the same message, processes it and commits the
> > transaction successfully
> > 6. Producer A calls sendOffsetsToTransaction (because the lock was
> > released) and fails with CommitFailedException
> >
> > This behaviour reflects what is described here
> >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > ,
> > but I was actually expecting a ProducerFencedException instead. Does that
> > exception only correspond to fencing done by transactional.id?
> >
> > Thanks
> >
> > El mar, 24 may 2022 a las 20:30, Guozhang Wang (<wa...@gmail.com>)
> > escribió:
> >
> > > No problem.
> > >
> > > The key is that at step 4, when the consumer re-joins it will be aware
> > that
> > > it has lost its previously assigned partitions and will trigger
> > > `onPartitionsLost` on the rebalance callback. And since in your
> scenario
> > > it's a 1-1 mapping from consumer to producer, it means the producer has
> > > been fenced and hence should be closed.
> > >
> > > So in that step 4, the old producer with Client A should be closed
> within
> > > the rebalance callback, and then one can create a new producer to pair
> > with
> > > the re-joined consumer.
> > >
> > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi <
> gabrielgiussi@gmail.com>
> > > wrote:
> > >
> > > > Last question, the fencing occurs with the sendOffsetsToTransaction
> > which
> > > > includes ConsumerGroupMetadata, I guess the generation.id is what
> > > matters
> > > > here since it is bumped with each rebalance.
> > > > But couldn't this happen?
> > > > 1. Client A consumes from topic partition P1 with generation.id = 1
> > and
> > > a
> > > > producer associated to it produces to some output topic but a long GC
> > > pause
> > > > occurs before calling sendOffsetsToTransaction
> > > > 2. Client A gets out of sync and becomes a zombie due to session
> > timeout,
> > > > group rebalanced.
> > > > 3. Client B is assigned topic partition P1 with generation.id = 2,
> > calls
> > > > sendOffsetsToTransaction and commits the txn
> > > > 4. Client A is back online and joins again with generation.id = 3
> > (this
> > > > happens in some internal thread)
> > > > 5. The thread that was about to call sendOffsetsToTransaction is
> > > scheduled
> > > > and calls sendOffsetsToTransaction with generation.id = 3 which is
> the
> > > > current one so it won't be fenced.
> > > >
> > > > I'm asking this because we are always asking the current
> > > > consumerGroupMetadata to the consumer object, not the one that was
> used
> > > to
> > > > consume the offsets, like this
> > > > producer.sendOffsetsToTransaction(consumedOffsets,
> > > > consumer.groupMetadata());
> > > >
> > > > Couldn't this return a groupMetadata that has a valid generation.id
> > even
> > > > when it is not the same at the moment of consuming the messages that
> > are
> > > > about to be commited?
> > > >
> > > > I'm sure I'm missing something (probably in step 4) that makes this
> > not a
> > > > possible scenario, but I can't say what it is.
> > > >
> > > > Sorry if the question is too confusing.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang (<wangguoz@gmail.com
> >)
> > > > escribió:
> > > >
> > > > > Hi Gabriel,
> > > > >
> > > > > What I meant is that with KIP-447, the fencing is achieved by the
> > time
> > > of
> > > > > committing with the consumer metadata. If within a transaction, the
> > > > > producer would always try to commit at least once on behalf of the
> > > > > consumer, AND a zombie of the producer would always come from a
> > zombie
> > > > of a
> > > > > consumer, then the transaction would be guaranteed to be fenced.
> But:
> > > > >
> > > > > 1) If within a transaction, there's no `sendOffset..` triggered,
> then
> > > > > fencing still need to be done by the txn coordinator, and txn.id
> > plays
> > > > the
> > > > > role here ---- I think this is not your scenario.
> > > > > 2) If a consumer may be "represented" by multiple producers, and a
> > > zombie
> > > > > producer does not come from a zombie consumer, then we still need
> the
> > > > > fencing be done via the txn.id --- this is the scenario I'd like
> to
> > > > remind
> > > > > you about. For example, if two producers could be (mistakenly)
> > created
> > > > with
> > > > > different txn.ids and are paired with the same consumer, then the
> new
> > > API
> > > > > in KIP-447 would not fence one of them.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi <
> > > gabrielgiussi@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello Guozhang,
> > > > > >
> > > > > > thanks for the response, I have some doubts about the "N-1
> > > > > > producer-consumer" case you mentioned and why I may need to
> > configure
> > > > the
> > > > > > transactional id there and how. Is this a case of N consumers
> > sharing
> > > > the
> > > > > > same producer right?
> > > > > >
> > > > > > My current implementation is creating a consumer per topic (I
> don't
> > > > > > subscribe to multiple topics from the same consumer) and
> starting a
> > > > > > producer per consumer, so the relation is 1 consumer/topic => 1
> > > > producer
> > > > > > and the transactional id is set as
> > > > > <consumer-group>-<topic>-<random-uuid>.
> > > > > > Do you see any problem with this configuration?
> > > > > >
> > > > > > Thanks again.
> > > > > >
> > > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<
> > wangguoz@gmail.com
> > > >)
> > > > > > escribió:
> > > > > >
> > > > > > > Hello Gabriel,
> > > > > > >
> > > > > > > What you're asking is a very fair question :) In fact, for
> > Streams
> > > > > where
> > > > > > > the partition-assignment to producer-consumer pairs are purely
> > > > > flexible,
> > > > > > we
> > > > > > > think the new EOS would not have hard requirement on
> > > > transactional.id:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-9453
> > > > > > >
> > > > > > > I you implemented the transactional messaging via a DIY
> > > > > producer+consumer
> > > > > > > though, it depends on how you'd expect the life-time of a
> > producer,
> > > > > e.g.
> > > > > > if
> > > > > > > you do not have a 1-1 producer-consumer mapping then
> > > > transactional.id
> > > > > is
> > > > > > > not crucial, but if your have a N-1 producer-consumer mapping
> > then
> > > > you
> > > > > > may
> > > > > > > still need to configure that id.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi <
> > > > > gabrielgiussi@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Before KIP-447 I understood the use of transactional.id to
> > > prevent
> > > > > us
> > > > > > > from
> > > > > > > > zombies introducing duplicates, as explained in this talk
> > > > > > > > https://youtu.be/j0l_zUhQaTc?t=822.
> > > > > > > > So in order to get zombie fencing working correctly we should
> > > > assign
> > > > > > > > producers with a transactional.id that included the
> partition
> > > id,
> > > > > > > > something
> > > > > > > > like <application><topic>-<partition-id>, as shown in this
> > slide
> > > > > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should
> > use
> > > > the
> > > > > > > same
> > > > > > > > txnl.id A as the process 1 that crashed.
> > > > > > > > This prevented us from having process 2 consuming the message
> > > again
> > > > > and
> > > > > > > > committing, while process 1 could come back to life and also
> > > commit
> > > > > the
> > > > > > > > pending transaction, hence having duplicates message being
> > > > produced.
> > > > > In
> > > > > > > > this case process 1 will be fenced by having an outdated
> epoch.
> > > > > > > >
> > > > > > > > With KIP-447 we no longer have that potential scenario of two
> > > > pending
> > > > > > > > transactions trying to produce and mark a message as
> committed,
> > > > > because
> > > > > > > we
> > > > > > > > won't let process 2 even start the transaction if there is a
> > > > pending
> > > > > > one
> > > > > > > > (basically by not returning any messages since we reject the
> > > Offset
> > > > > > Fetch
> > > > > > > > if a there is a pending transaction for that offset
> partition).
> > > > This
> > > > > is
> > > > > > > > explained in this post
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > > > > > >
> > > > > > > > Having that, I don't see anymore the value of
> transactional.id
> > > or
> > > > > how
> > > > > > I
> > > > > > > > should configure it in my producers. The main benefit of
> > KIP-447
> > > is
> > > > > > that
> > > > > > > we
> > > > > > > > no longer have to start one producer per input partition, a
> > quote
> > > > > from
> > > > > > > the
> > > > > > > > post
> > > > > > > > "The only way the static assignment requirement could be met
> is
> > > if
> > > > > each
> > > > > > > > input partition uses a separate producer instance, which is
> in
> > > fact
> > > > > > what
> > > > > > > > Kafka Streams previously relied on. However, this made
> running
> > > EOS
> > > > > > > > applications much more costly in terms of the client
> resources
> > > and
> > > > > load
> > > > > > > on
> > > > > > > > the brokers. A large number of client connections could
> heavily
> > > > > impact
> > > > > > > the
> > > > > > > > stability of brokers and become a waste of resources as
> well."
> > > > > > > >
> > > > > > > > I guess now I can reuse my producer between different input
> > > > > partitions,
> > > > > > > so
> > > > > > > > what transactional.id should I assign to it and why should I
> > > care,
> > > > > > isn't
> > > > > > > > zombie fencing resolved by rejecting offset fetch already?
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: What role plays transactional.id after KIP-447?

Posted by Guozhang Wang <wa...@gmail.com>.
I think "commitTransaction" should not throw CommitFailedException. Here
admittedly we are overusing the term "commit" here, as we use it for two
operations: committing the offsets (used for consumer, in either EOS or
ALOS), and committing the transaction. The exception is meant for the
former and would not be expected in `commitTransaction`.


Guozhang

On Thu, Jun 2, 2022 at 5:45 AM Gabriel Giussi <ga...@gmail.com>
wrote:

> "I think we may overlooked it in documentation to emphasize that, in case
> 1), it should not expect ProducerFencedException. If so, we can fix the
> javadoc."
>
> IMHO that would be nice, I'm reviewing an existing codebase where we were
> only handling ProducerFencedException, because the javadoc and the method
> signature is explicit only about that, and CommitFailedException is not
> even referenced but falls under the general KafkaException.
> I think this could happen in both sendOffsetsToTransaction and
> commitTransaction right?
>
> Thanks.
>
> El mar, 31 may 2022 a las 14:49, Guozhang Wang (<wa...@gmail.com>)
> escribió:
>
> > The CommitFailedException should be expected, since the fencing happens
> at
> > the consumer coordinator. I.e. we can only fence the consumer-producer
> pair
> > by the consumer's generation, but we cannot do so since there's no other
> > producer who has just grabbed the same txn.id and bumped the producer
> > epoch.
> >
> > So to just clarify, when the zombie comes back, it could be fenced either
> > when:
> >
> > 1) it tries to complete the ongoing transaction via `sendOffset`, in
> which
> > it would see the CommitFailedException. The caller is then responsible to
> > handle the thrown exception that indicates being fenced.
> > 2) it tries to heartbeat in the background thread, and got an
> > InvalidGeneration error code, in which it would trigger the
> > onPartitionsLost. The callback impl class is then responsible to handle
> > that case which indicates being fenced.
> >
> > I think we may overlooked it in documentation to emphasize that, in case
> > 1), it should not expect ProducerFencedException. If so, we can fix the
> > javadoc.
> >
> >
> >
> >
> > On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi <ga...@gmail.com>
> > wrote:
> >
> > > But there is no guarantee that the onPartitionsLost callback will be
> > called
> > > before a zombie producer coming back to life tries to continue with the
> > > transaction, e.g. sending offsets or committing, so I should handle the
> > > exception first and I could directly create a new producer there
> instead
> > of
> > > doing in the callback.
> > > The curious part for me is that I was able to reproduce a case that
> > > simulates a zombie producer that will try to send offsets after a
> > rebalance
> > > but instead of failing with a ProducerFencedException is failing with a
> > > CommitFailedException with this message "Transaction offset Commit
> failed
> > > due to consumer group metadata mismatch: Specified group generation id
> is
> > > not valid.", which makes sense but is not even documented in the
> > > KafkaProducer#sendOffsetsToTransaction.
> > > Is this the expected behaviour or it should fail with a
> > > ProducerFencedException when the generation.id is outdated?
> > > The case I reproduced is like this
> > > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
> > > producer with transactional.id = "tid.123"
> > > 2. Consumes message from partition 1 and sends it to another thread to
> be
> > > consumed (so the poll thread is not blocked)
> > > 3. Producer A begins a transaction, sends to output topic and gets
> > blocked
> > > (I'm using a lock here to simulate a long processing) before calling
> > > sendOffsetsToTransaction
> > > 4. Consumer B is created and gets assigned partition 1 (I'm using
> > > CooperativeStickyAssignor) and creates a producer with
> transactional.id
> > =
> > > "tid.456"
> > > 5. Consumer B fetches the same message, processes it and commits the
> > > transaction successfully
> > > 6. Producer A calls sendOffsetsToTransaction (because the lock was
> > > released) and fails with CommitFailedException
> > >
> > > This behaviour reflects what is described here
> > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > ,
> > > but I was actually expecting a ProducerFencedException instead. Does
> that
> > > exception only correspond to fencing done by transactional.id?
> > >
> > > Thanks
> > >
> > > El mar, 24 may 2022 a las 20:30, Guozhang Wang (<wa...@gmail.com>)
> > > escribió:
> > >
> > > > No problem.
> > > >
> > > > The key is that at step 4, when the consumer re-joins it will be
> aware
> > > that
> > > > it has lost its previously assigned partitions and will trigger
> > > > `onPartitionsLost` on the rebalance callback. And since in your
> > scenario
> > > > it's a 1-1 mapping from consumer to producer, it means the producer
> has
> > > > been fenced and hence should be closed.
> > > >
> > > > So in that step 4, the old producer with Client A should be closed
> > within
> > > > the rebalance callback, and then one can create a new producer to
> pair
> > > with
> > > > the re-joined consumer.
> > > >
> > > > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi <
> > gabrielgiussi@gmail.com>
> > > > wrote:
> > > >
> > > > > Last question, the fencing occurs with the sendOffsetsToTransaction
> > > which
> > > > > includes ConsumerGroupMetadata, I guess the generation.id is what
> > > > matters
> > > > > here since it is bumped with each rebalance.
> > > > > But couldn't this happen?
> > > > > 1. Client A consumes from topic partition P1 with generation.id =
> 1
> > > and
> > > > a
> > > > > producer associated to it produces to some output topic but a long
> GC
> > > > pause
> > > > > occurs before calling sendOffsetsToTransaction
> > > > > 2. Client A gets out of sync and becomes a zombie due to session
> > > timeout,
> > > > > group rebalanced.
> > > > > 3. Client B is assigned topic partition P1 with generation.id = 2,
> > > calls
> > > > > sendOffsetsToTransaction and commits the txn
> > > > > 4. Client A is back online and joins again with generation.id = 3
> > > (this
> > > > > happens in some internal thread)
> > > > > 5. The thread that was about to call sendOffsetsToTransaction is
> > > > scheduled
> > > > > and calls sendOffsetsToTransaction with generation.id = 3 which is
> > the
> > > > > current one so it won't be fenced.
> > > > >
> > > > > I'm asking this because we are always asking the current
> > > > > consumerGroupMetadata to the consumer object, not the one that was
> > used
> > > > to
> > > > > consume the offsets, like this
> > > > > producer.sendOffsetsToTransaction(consumedOffsets,
> > > > > consumer.groupMetadata());
> > > > >
> > > > > Couldn't this return a groupMetadata that has a valid
> generation.id
> > > even
> > > > > when it is not the same at the moment of consuming the messages
> that
> > > are
> > > > > about to be commited?
> > > > >
> > > > > I'm sure I'm missing something (probably in step 4) that makes this
> > > not a
> > > > > possible scenario, but I can't say what it is.
> > > > >
> > > > > Sorry if the question is too confusing.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > El mar, 24 may 2022 a las 12:49, Guozhang Wang (<
> wangguoz@gmail.com
> > >)
> > > > > escribió:
> > > > >
> > > > > > Hi Gabriel,
> > > > > >
> > > > > > What I meant is that with KIP-447, the fencing is achieved by the
> > > time
> > > > of
> > > > > > committing with the consumer metadata. If within a transaction,
> the
> > > > > > producer would always try to commit at least once on behalf of
> the
> > > > > > consumer, AND a zombie of the producer would always come from a
> > > zombie
> > > > > of a
> > > > > > consumer, then the transaction would be guaranteed to be fenced.
> > But:
> > > > > >
> > > > > > 1) If within a transaction, there's no `sendOffset..` triggered,
> > then
> > > > > > fencing still need to be done by the txn coordinator, and txn.id
> > > plays
> > > > > the
> > > > > > role here ---- I think this is not your scenario.
> > > > > > 2) If a consumer may be "represented" by multiple producers, and
> a
> > > > zombie
> > > > > > producer does not come from a zombie consumer, then we still need
> > the
> > > > > > fencing be done via the txn.id --- this is the scenario I'd like
> > to
> > > > > remind
> > > > > > you about. For example, if two producers could be (mistakenly)
> > > created
> > > > > with
> > > > > > different txn.ids and are paired with the same consumer, then the
> > new
> > > > API
> > > > > > in KIP-447 would not fence one of them.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi <
> > > > gabrielgiussi@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hello Guozhang,
> > > > > > >
> > > > > > > thanks for the response, I have some doubts about the "N-1
> > > > > > > producer-consumer" case you mentioned and why I may need to
> > > configure
> > > > > the
> > > > > > > transactional id there and how. Is this a case of N consumers
> > > sharing
> > > > > the
> > > > > > > same producer right?
> > > > > > >
> > > > > > > My current implementation is creating a consumer per topic (I
> > don't
> > > > > > > subscribe to multiple topics from the same consumer) and
> > starting a
> > > > > > > producer per consumer, so the relation is 1 consumer/topic => 1
> > > > > producer
> > > > > > > and the transactional id is set as
> > > > > > <consumer-group>-<topic>-<random-uuid>.
> > > > > > > Do you see any problem with this configuration?
> > > > > > >
> > > > > > > Thanks again.
> > > > > > >
> > > > > > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang (<
> > > wangguoz@gmail.com
> > > > >)
> > > > > > > escribió:
> > > > > > >
> > > > > > > > Hello Gabriel,
> > > > > > > >
> > > > > > > > What you're asking is a very fair question :) In fact, for
> > > Streams
> > > > > > where
> > > > > > > > the partition-assignment to producer-consumer pairs are
> purely
> > > > > > flexible,
> > > > > > > we
> > > > > > > > think the new EOS would not have hard requirement on
> > > > > transactional.id:
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9453
> > > > > > > >
> > > > > > > > I you implemented the transactional messaging via a DIY
> > > > > > producer+consumer
> > > > > > > > though, it depends on how you'd expect the life-time of a
> > > producer,
> > > > > > e.g.
> > > > > > > if
> > > > > > > > you do not have a 1-1 producer-consumer mapping then
> > > > > transactional.id
> > > > > > is
> > > > > > > > not crucial, but if your have a N-1 producer-consumer mapping
> > > then
> > > > > you
> > > > > > > may
> > > > > > > > still need to configure that id.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi <
> > > > > > gabrielgiussi@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Before KIP-447 I understood the use of transactional.id to
> > > > prevent
> > > > > > us
> > > > > > > > from
> > > > > > > > > zombies introducing duplicates, as explained in this talk
> > > > > > > > > https://youtu.be/j0l_zUhQaTc?t=822.
> > > > > > > > > So in order to get zombie fencing working correctly we
> should
> > > > > assign
> > > > > > > > > producers with a transactional.id that included the
> > partition
> > > > id,
> > > > > > > > > something
> > > > > > > > > like <application><topic>-<partition-id>, as shown in this
> > > slide
> > > > > > > > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2
> should
> > > use
> > > > > the
> > > > > > > > same
> > > > > > > > > txnl.id A as the process 1 that crashed.
> > > > > > > > > This prevented us from having process 2 consuming the
> message
> > > > again
> > > > > > and
> > > > > > > > > committing, while process 1 could come back to life and
> also
> > > > commit
> > > > > > the
> > > > > > > > > pending transaction, hence having duplicates message being
> > > > > produced.
> > > > > > In
> > > > > > > > > this case process 1 will be fenced by having an outdated
> > epoch.
> > > > > > > > >
> > > > > > > > > With KIP-447 we no longer have that potential scenario of
> two
> > > > > pending
> > > > > > > > > transactions trying to produce and mark a message as
> > committed,
> > > > > > because
> > > > > > > > we
> > > > > > > > > won't let process 2 even start the transaction if there is
> a
> > > > > pending
> > > > > > > one
> > > > > > > > > (basically by not returning any messages since we reject
> the
> > > > Offset
> > > > > > > Fetch
> > > > > > > > > if a there is a pending transaction for that offset
> > partition).
> > > > > This
> > > > > > is
> > > > > > > > > explained in this post
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > > > > > > >
> > > > > > > > > Having that, I don't see anymore the value of
> > transactional.id
> > > > or
> > > > > > how
> > > > > > > I
> > > > > > > > > should configure it in my producers. The main benefit of
> > > KIP-447
> > > > is
> > > > > > > that
> > > > > > > > we
> > > > > > > > > no longer have to start one producer per input partition, a
> > > quote
> > > > > > from
> > > > > > > > the
> > > > > > > > > post
> > > > > > > > > "The only way the static assignment requirement could be
> met
> > is
> > > > if
> > > > > > each
> > > > > > > > > input partition uses a separate producer instance, which is
> > in
> > > > fact
> > > > > > > what
> > > > > > > > > Kafka Streams previously relied on. However, this made
> > running
> > > > EOS
> > > > > > > > > applications much more costly in terms of the client
> > resources
> > > > and
> > > > > > load
> > > > > > > > on
> > > > > > > > > the brokers. A large number of client connections could
> > heavily
> > > > > > impact
> > > > > > > > the
> > > > > > > > > stability of brokers and become a waste of resources as
> > well."
> > > > > > > > >
> > > > > > > > > I guess now I can reuse my producer between different input
> > > > > > partitions,
> > > > > > > > so
> > > > > > > > > what transactional.id should I assign to it and why
> should I
> > > > care,
> > > > > > > isn't
> > > > > > > > > zombie fencing resolved by rejecting offset fetch already?
> > > > > > > > >
> > > > > > > > > Thanks.
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang