You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Urbán Dániel <ur...@gmail.com> on 2022/10/22 07:45:33 UTC

Re: Transactions, delivery timeout and changing transactional producer behavior

Hi everyone,

I would like to bump this transactional producer bug 
again:https://issues.apache.org/jira/browse/KAFKA-14053
Which already has an open PR: https://github.com/apache/kafka/pull/12392

There is a not-so-frequent, but - in my opinion - critical bug in 
transactional producers. If the bug occurs, it corrupts the partition 
and blocks read_committed consumers.

Can someone please take a look? I see 2 possible solutions, implemented 
one of those. I'm willing to work on the 2nd approach as well, but I 
need some input on the PR.

Thanks in advance,
Daniel

2022. 09. 13. 18:46 keltezéssel, Chris Egerton írta:
> Hi Colt,
>
> You can certainly review PRs, but you cannot merge them. Reviews are still
> valuable as they help lighten the workload on committers and, to some
> degree, provide a signal of how high-priority a change is to the community.
>
> Cheers,
>
> Chris
>
> On Sun, Sep 11, 2022 at 12:19 PM Colt McNealy <co...@littlehorse.io> wrote:
>
>> Hi all—
>>
>> I'm not a committer so I can't review this PR (or is that not true?).
>> However, I'd like to bump this as well. I believe that I've encountered
>> this bug during chaos testing with the transactional producer. I can
>> sometimes produce this error when killing a broker during a long-running
>> transaction, which causes a batch to encounter delivery timeout as
>> described in the Jira. I have observed some inconsistencies with
>> the consumer offset being advanced prematurely (i.e. perhaps after the
>> delivery of the EndTxnRequest).
>>
>> Daniel, thank you for the PR.
>>
>> Cheers,
>> Colt McNealy
>> *Founder, LittleHorse.io*
>>
>> On Fri, Sep 9, 2022 at 9:54 AM Dániel Urbán <ur...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I would like to bump this and bring some attention to the issue.
>>> This is a nasty bug in the transactional producer, would be nice if I
>> could
>>> get some feedback on the PR: https://github.com/apache/kafka/pull/12392
>>>
>>> Thanks in advance,
>>> Daniel
>>>
>>> Viktor Somogyi-Vass <vi...@cloudera.com.invalid> ezt írta
>>> (időpont: 2022. júl. 25., H, 15:28):
>>>
>>>> Hi Luke & Artem,
>>>>
>>>> We prepared the fix, would you please help in getting a
>>> committer-reviewer
>>>> to get this issue resolved?
>>>>
>>>> Thanks,
>>>> Viktor
>>>>
>>>> On Fri, Jul 8, 2022 at 12:57 PM Dániel Urbán <ur...@gmail.com>
>>>> wrote:
>>>>
>>>>> Submitted a PR with the fix:
>>> https://github.com/apache/kafka/pull/12392
>>>>> In the PR I tried keeping the producer in a usable state after the
>>> forced
>>>>> bump. I understand that it might be the cleanest solution, but the
>> only
>>>>> other option I know of is to transition into a fatal state, meaning
>>> that
>>>>> the producer has to be recreated after a delivery timeout. I think
>> that
>>>> is
>>>>> still fine compared to the out-of-order messages.
>>>>>
>>>>> Looking forward to your reviews,
>>>>> Daniel
>>>>>
>>>>> Dániel Urbán <ur...@gmail.com> ezt írta (időpont: 2022. júl.
>> 7.,
>>>> Cs,
>>>>> 12:04):
>>>>>
>>>>>> Thanks for the feedback, I created
>>>>>> https://issues.apache.org/jira/browse/KAFKA-14053 and started
>>> working
>>>> on
>>>>>> a PR.
>>>>>>
>>>>>> Luke, for the workaround, we used the transaction admin tool
>> released
>>>> in
>>>>>> 3.0 to "abort" these hanging batches manually.
>>>>>> Naturally, the cluster health should be stabilized. This issue
>> popped
>>>> up
>>>>>> most frequently around times when some partitions went into a few
>>>> minute
>>>>>> window of unavailability. The infinite retries on the producer side
>>>>> caused
>>>>>> a situation where the last retry was still in-flight, but the
>>> delivery
>>>>>> timeout was triggered on the client side. We reduced the retries
>> and
>>>>>> increased the delivery timeout to avoid such situations.
>>>>>> Still, the issue can occur in other scenarios, for example a client
>>>>>> queueing up many batches in the producer buffer, and causing those
>>>>> batches
>>>>>> to spend most of the delivery timeout window in the client memory.
>>>>>>
>>>>>> Thanks,
>>>>>> Daniel
>>>>>>
>>>>>> Luke Chen <sh...@gmail.com> ezt írta (időpont: 2022. júl. 7.,
>> Cs,
>>>>> 5:13):
>>>>>>> Hi Daniel,
>>>>>>>
>>>>>>> Thanks for reporting the issue, and the investigation.
>>>>>>> I'm curious, so, what's your workaround for this issue?
>>>>>>>
>>>>>>> I agree with Artem, it makes sense. Please file a bug in JIRA.
>>>>>>> And looking forward to your PR! :)
>>>>>>>
>>>>>>> Thank you.
>>>>>>> Luke
>>>>>>>
>>>>>>> On Thu, Jul 7, 2022 at 3:07 AM Artem Livshits
>>>>>>> <al...@confluent.io.invalid> wrote:
>>>>>>>
>>>>>>>> Hi Daniel,
>>>>>>>>
>>>>>>>> What you say makes sense.  Could you file a bug and put this
>> info
>>>>> there
>>>>>>> so
>>>>>>>> that it's easier to track?
>>>>>>>>
>>>>>>>> -Artem
>>>>>>>>
>>>>>>>> On Wed, Jul 6, 2022 at 8:34 AM Dániel Urbán <
>>> urb.daniel7@gmail.com>
>>>>>>> wrote:
>>>>>>>>> Hello everyone,
>>>>>>>>>
>>>>>>>>> I've been investigating some transaction related issues in a
>>> very
>>>>>>>>> problematic cluster. Besides finding some interesting issues,
>> I
>>>> had
>>>>>>> some
>>>>>>>>> ideas about how transactional producer behavior could be
>>> improved.
>>>>>>>>> My suggestion in short is: when the transactional producer
>>>>> encounters
>>>>>>> an
>>>>>>>>> error which doesn't necessarily mean that the in-flight
>> request
>>>> was
>>>>>>>>> processed (for example a client side timeout), the producer
>>> should
>>>>> not
>>>>>>>> send
>>>>>>>>> an EndTxnRequest on abort, but instead it should bump the
>>> producer
>>>>>>> epoch.
>>>>>>>>> The long description about the issue I found, and how I came
>> to
>>>> the
>>>>>>>>> suggestion:
>>>>>>>>>
>>>>>>>>> First, the description of the issue. When I say that the
>> cluster
>>>> is
>>>>>>> "very
>>>>>>>>> problematic", I mean all kinds of different issues, be it
>> infra
>>>>> (disks
>>>>>>>> and
>>>>>>>>> network) or throughput (high volume producers without fine
>>>> tuning).
>>>>>>>>> In this cluster, Kafka transactions are widely used by many
>>>>> producers.
>>>>>>>> And
>>>>>>>>> in this cluster, partitions get "stuck" frequently (few times
>>>> every
>>>>>>>> week).
>>>>>>>>> The exact meaning of a partition being "stuck" is this:
>>>>>>>>>
>>>>>>>>> On the client side:
>>>>>>>>> 1. A transactional producer sends X batches to a partition in
>> a
>>>>> single
>>>>>>>>> transaction
>>>>>>>>> 2. Out of the X batches, the last few get sent, but are timed
>>> out
>>>>>>> thanks
>>>>>>>> to
>>>>>>>>> the delivery timeout config
>>>>>>>>> 3. producer.flush() is unblocked due to all batches being
>>>> "finished"
>>>>>>>>> 4. Based on the errors reported in the producer.send()
>> callback,
>>>>>>>>> producer.abortTransaction() is called
>>>>>>>>> 5. Then producer.close() is also invoked with a 5s timeout
>> (this
>>>>>>>>> application does not reuse the producer instances optimally)
>>>>>>>>> 6. The transactional.id of the producer is never reused (it
>> was
>>>>>>> random
>>>>>>>>> generated)
>>>>>>>>>
>>>>>>>>> On the partition leader side (what appears in the log segment
>> of
>>>> the
>>>>>>>>> partition):
>>>>>>>>> 1. The batches sent by the producer are all appended to the
>> log
>>>>>>>>> 2. But the ABORT marker of the transaction was appended before
>>> the
>>>>>>> last 1
>>>>>>>>> or 2 batches of the transaction
>>>>>>>>>
>>>>>>>>> On the transaction coordinator side (what appears in the
>>>> transaction
>>>>>>>> state
>>>>>>>>> partition):
>>>>>>>>> The transactional.id is present with the Empty state.
>>>>>>>>>
>>>>>>>>> These happenings result in the following:
>>>>>>>>> 1. The partition leader handles the first batch after the
>> ABORT
>>>>>>> marker as
>>>>>>>>> the first message of a new transaction of the same producer
>> id +
>>>>>>> epoch.
>>>>>>>>> (LSO is blocked at this point)
>>>>>>>>> 2. The transaction coordinator is not aware of any in-progress
>>>>>>>> transaction
>>>>>>>>> of the producer, thus never aborting the transaction, not even
>>>> after
>>>>>>> the
>>>>>>>>> transaction.timeout.ms passes.
>>>>>>>>>
>>>>>>>>> This is happening with Kafka 2.5 running in the cluster,
>>> producer
>>>>>>>> versions
>>>>>>>>> range between 2.0 and 2.6.
>>>>>>>>> I scanned through a lot of tickets, and I believe that this
>>> issue
>>>> is
>>>>>>> not
>>>>>>>>> specific to these versions, and could happen with newest
>>> versions
>>>> as
>>>>>>>> well.
>>>>>>>>> If I'm mistaken, some pointers would be appreciated.
>>>>>>>>>
>>>>>>>>> Assuming that the issue could occur with any version, I
>> believe
>>>> this
>>>>>>>> issue
>>>>>>>>> boils down to one oversight on the client side:
>>>>>>>>> When a request fails without a definitive response (e.g. a
>>>> delivery
>>>>>>>>> timeout), the client cannot assume that the request is
>>> "finished",
>>>>> and
>>>>>>>>> simply abort the transaction. If the request is still in
>> flight,
>>>> and
>>>>>>> the
>>>>>>>>> EndTxnRequest, then the WriteTxnMarkerRequest gets sent and
>>>>> processed
>>>>>>>>> earlier, the contract is violated by the client.
>>>>>>>>> This could be avoided by providing more information to the
>>>> partition
>>>>>>>>> leader. Right now, a new transactional batch signals the start
>>> of
>>>> a
>>>>>>> new
>>>>>>>>> transaction, and there is no way for the partition leader to
>>>> decide
>>>>>>>> whether
>>>>>>>>> the batch is an out-of-order message.
>>>>>>>>> In a naive and wasteful protocol, we could have a unique
>>>> transaction
>>>>>>> id
>>>>>>>>> added to each batch and marker, meaning that the leader would
>> be
>>>>>>> capable
>>>>>>>> of
>>>>>>>>> refusing batches which arrive after the control marker of the
>>>>>>>> transaction.
>>>>>>>>> But instead of changing the log format and the protocol, we
>> can
>>>>>>> achieve
>>>>>>>> the
>>>>>>>>> same by bumping the producer epoch.
>>>>>>>>>
>>>>>>>>> Bumping the epoch has a similar effect to "changing the
>>>> transaction
>>>>>>> id" -
>>>>>>>>> the in-progress transaction will be aborted with a bumped
>>> producer
>>>>>>> epoch,
>>>>>>>>> telling the partition leader about the producer epoch change.
>>> From
>>>>>>> this
>>>>>>>>> point on, any batches sent with the old epoch will be refused
>> by
>>>> the
>>>>>>>> leader
>>>>>>>>> due to the fencing mechanism. It doesn't really matter how
>> many
>>>>>>> batches
>>>>>>>>> will get appended to the log, and how many will be refused -
>>> this
>>>> is
>>>>>>> an
>>>>>>>>> aborted transaction - but the out-of-order message cannot
>> occur,
>>>> and
>>>>>>>> cannot
>>>>>>>>> block the LSO infinitely.
>>>>>>>>>
>>>>>>>>> My suggestion is, that the TransactionManager inside the
>>> producer
>>>>>>> should
>>>>>>>>> keep track of what type of errors were encountered by the
>>> batches
>>>> of
>>>>>>> the
>>>>>>>>> transaction, and categorize them along the lines of
>> "definitely
>>>>>>>> completed"
>>>>>>>>> and "might not be completed". When the transaction goes into
>> an
>>>>>>> abortable
>>>>>>>>> state, and there is at least one batch with "might not be
>>>>> completed",
>>>>>>> the
>>>>>>>>> EndTxnRequest should be skipped, and an epoch bump should be
>>> sent.
>>>>>>>>> As for what type of error counts as "might not be completed",
>> I
>>>> can
>>>>>>> only
>>>>>>>>> think of client side timeouts.
>>>>>>>>>
>>>>>>>>> I believe this is a relatively small change (only affects the
>>>> client
>>>>>>>> lib),
>>>>>>>>> but it helps in avoiding some corrupt states in Kafka
>>>> transactions.
>>>>>>>>> Looking forward to your input. If it seems like a sane idea, I
>>> go
>>>>>>> ahead
>>>>>>>> and
>>>>>>>>> submit a PR for it as well.
>>>>>>>>>
>>>>>>>>> Thanks in advance,
>>>>>>>>> Daniel
>>>>>>>>>

-- 
Ezt az e-mailt átvizsgálta az Avast AntiVirus szoftver.
www.avast.com