You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Travis Bischel (Jira)" <ji...@apache.org> on 2021/04/17 00:15:00 UTC

[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

    [ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324135#comment-17324135 ] 

Travis Bischel commented on KAFKA-12671:
----------------------------------------

I've mostly worked around this issue in my client with this commit: https://github.com/twmb/franz-go/commit/10b743ed5cefa0ef321ce7c2bd12bb19ff529f28

Quoting the commit message,
"""
What we do is, when aborting, we wait for all sinks to be completely
done with their produce requests. This solves most of the problem to
begin with: before, we did not wait, so if we issued a produce request
we could immediately proceed to issuing EndTxn, and the EndTxn may be
handled first. By waiting, we _ensure_ that Kafka has handled our
produce requests.

However, if we wait and the request is cut, it may be that our
connection died right after writing the produce request. In this case,
we now see that the last produce request had an issuing error, and we
wait 1s before sending EndTxn. The hope is that this 1s is enough time
for the ProduceRequest to be processed by Kafka itself.

We have effectively changed this issue from a slim change to an
extremely slim chance under very bad conditions.
"""

I do think that this issue is worth solving within the broker itself.

> Out of order processing with a transactional producer can lead to a stuck LastStableOffset
> ------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12671
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12671
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Travis Bischel
>            Priority: Major
>
> If there is pathological processing of incoming produce requests and EndTxn requests, then the LastStableOffset can get stuck, which will block consuming in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is responsible for fencing and adding partitions to a transaction, and the end transaction is responsible for finishing the transaction. Producing itself is mostly uninvolved with the proper fencing / ending flow, but produce requests are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager to mildly manage transactions. The ProducerStateManager is completely independent of the TxnCoordinator, and its guarantees are relatively weak. The ProducerStateManager handles two types of "batches" being added: a data batch and a transaction marker. When a data batch is added, a "transaction" is begun and tied to the producer ID that is producing the batch. When a transaction marker is handled, the ProducerStateManager removes the transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the ProducerStateManager. In essence, EndTxn is the one part of the transactional producer flow that talks across both the TxnCoordinator and the ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka after EndTxn, then the ProduceRequest will begin a new transaction in the ProducerStateManager. If the client was disconnecting, and the EndTxn was the final request issued, the new transaction created in ProducerStateManager is orphaned and nothing can clean it up. The LastStableOffset then hangs based off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a transactional ID outside of the context of a transaction at all (no AddPartitionsToTxn). This problem cannot be triggered by producing for so long that the transaction expires; the difference here is that the transaction coordinator bumps the epoch for the producer ID, thus producing again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, but in the context of wanting to abort everything and shut down, this is not always feasible. As it currently stands, I'm not sure there's a truly safe way to shut down _without_ flushing and receiving responses for every record produced, even if I want to abort everything and quit. The safest approach I can think of is to actually avoid issuing an EndTxn so that instead we rely on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write two ProduceRequests, read one, and then issue EndTxn (because I know I want to quit). The second ProduceRequest is read successfully before shutdown, but I ignore the results because I am shutting down. I've taken out logs related to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: <nil>
> [DEBUG] read FindCoordinator v3; err: <nil>
> [DEBUG] wrote InitProducerID v4; err: <nil>
> [DEBUG] read InitProducerID v4; err: <nil>
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: <nil>
> [DEBUG] read AddPartitionsToTxn v2; err: <nil>
> [DEBUG] wrote Produce v8; err: <nil>
> [DEBUG] read Produce v8; err: <nil>
> [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: <nil>
> [DEBUG] wrote EndTxn v2; err: <nil>
> [DEBUG] read EndTxn v2; err: <nil>
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092, id: 1, successful_reads: 1, err: context canceled
> [DEBUG] read Produce v8; err: <nil>
> [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}]
> {noformat}
> And then from the broker's point of view. Across two brokers, the second ProduceRequest is completed after EndTxn is handled (and after the WriteTxnMarkers request is handled, which is the important one that hooks into the ProducerStateManager):
> {noformat}
> /// Broker 3: init producer ID
> [2021-04-15 00:56:40,030] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, correlationId=3) -- {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,transaction_timeout_ms=60000,producer_id=-1,producer_epoch=-1,_tagged_fields={}},response:{throttle_time_ms=0,error_code=0,producer_id=1463,producer_epoch=0,_tagged_fields={}} from connection 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:2.255,requestQueueTime:0.077,localTime:0.74,remoteTime:0.095,throttleTime:0,responseQueueTime:1.005,sendTime:0.336,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 3: add partitions to txn
> [2021-04-15 00:56:40,071] DEBUG Completed request:RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=2, clientId=kgo, correlationId=4) -- {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[1]}]},response:{throttle_time_ms=0,results=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,results=[{partition_index=1,error_code=0}]}]} from connection 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:1.247,requestQueueTime:0.133,localTime:0.71,remoteTime:0.136,throttleTime:0,responseQueueTime:0.087,sendTime:0.178,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 2: first produce
> [2021-04-15 00:56:40,223] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, correlationId=1) -- {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=15589,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0} from connection 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:2.705,requestQueueTime:0.055,localTime:2.435,remoteTime:0.058,throttleTime:0,responseQueueTime:0.055,sendTime:0.1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0),temporaryMemoryBytes:324898 (kafka.request.logger)
> // Broker 3: end txn
> [2021-04-15 00:56:40,350] DEBUG Completed request:RequestHeader(apiKey=END_TXN, apiVersion=2, clientId=kgo, correlationId=5) -- {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,committed=false},response:{throttle_time_ms=0,error_code=0} from connection 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:3.484,requestQueueTime:0.052,localTime:0.318,remoteTime:0.06,throttleTime:0,responseQueueTime:2.92,sendTime:0.133,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 2: txn markers
> [2021-04-15 00:56:40,357] DEBUG Completed request:RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0, clientId=broker-3-txn-marker-sender, correlationId=66708) -- {markers=[{producer_id=1463,producer_epoch=0,transaction_result=false,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_indexes=[1]}],coordinator_epoch=0}]},response:{markers=[{producer_id=1463,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[{partition_index=1,error_code=0}]}]}]} from connection 127.0.0.1:9094-127.0.0.1:38966-676;totalTime:3.507,requestQueueTime:1.957,localTime:0.34,remoteTime:0.031,throttleTime:0,responseQueueTime:0.324,sendTime:0.853,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=unknown, softwareVersion=unknown) (kafka.request.logger)
> /// Broker 2: second produce
> [2021-04-15 00:56:40,374] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, correlationId=2) -- {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=19687,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0} from connection 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:4.45,requestQueueTime:0.603,localTime:2.721,remoteTime:0.051,throttleTime:0,responseQueueTime:0.043,sendTime:1.031,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0),temporaryMemoryBytes:356824 (kafka.request.logger)
> {noformat}
> —
> I believe that one fix for this would be to only allow transactions to start in the ProducerStateManager if a transaction has actually begun through AddPartitionsToTxn, and to reject produce requests to partitions that have not been added to a txn. An alternative fix would be to just wait for all produce requests to finish before issuing EndTxn, but this seems less desirable when wanting to shut down and abort progress. Another alternative is to avoid issuing EndTxn and to just shutdown, but this also seems undesirable and will block consumers until the transaction timeout expires.
> This may be the cause of KAFKA-5880.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)