You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kindernay Oliver <ol...@eurowag.com.INVALID> on 2021/01/04 10:22:54 UTC

Offset commit consistently timing out in transaction

Hello,

we are experiencing problems with offset commit timing out on brokers. This started to happen when we started using transactions. send_offsets_to_transactions periodically (every few minutes) times out. We have cluster of three brokers, and topics with 15 partitions. We use one transactional producer per partition and commit the transaction at least each second or more. The time out seemingly happens randomly, each time for a different producer instance and different broker.


  1.  We used to get REQUEST_TIMED_OUT after 5 seconds. I understand that error came from a broker.
  2.  We tried to raise offsets.commit.timeout.ms on broker to 60 seconds
  3.  After the change, we are getting transaction operations timeout after 60s, with same periodicity. This is now a client error since the kafka broker takes the full minute, after which we would probably see the same error message from broker as previsously.
     *   <class 'cimpl.KafkaException'>/KafkaError{code=_TIMED_OUT,val=-185,str="Transactional operation timed out"}
     *   %5|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out TxnOffsetCommitRequest in flight (after 59943ms, timeout #0)
%4|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%7|1609747761.212|FAIL|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed out: disconnect (after 1544886ms in state UP) (_TIMED_OUT)
%3|1609747761.212|FAIL|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed out: disconnect (after 1544886ms in state UP)
%7|1609747761.212|STATE|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state UP -> DOWN
%7|1609747761.212|STATE|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state DOWN -> INIT
%7|1609747761.212|REQERR|rdkafka#producer-10| [thrd:main]: 10.48.111.102:9092/2: MetadataRequest failed: Local: Timed out: actions Retry
%7|1609747761.312|STATE|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state INIT -> TRY_CONNECT
%7|1609747761.312|RETRY|rdkafka#producer-10| [thrd:10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Moved 1 retry buffer(s) to output queue



  1.  Broker system metrics (disk, network, CPU, memory) did not indicate any bottleneck, Anyway, we tried to upgrade the broker VMs to larger size, with no change to the behaviour.
  2.  This happens on our test cluster, and on our prod cluster. It seems the frequency by which this happens in less on the test cluster (it has lower traffic and lower resources)
  3.  We use python's confluent_kafka 1.5.0 - based on librdkafka 1.5.0
  4.  Broker package version is confluent-kafka-2.11-2.0.1
  5.  I enabled TRACE log level for everything on the test cluster

This is a trace log from the broker. Client logs indicate that the timed-out operation started at 08:08:21:

2021-01-04 08:08:15,413] DEBUG TransactionalId mu-data-generator-7 complete transition from PrepareCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:08:15,413] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751) with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:08:15,413] TRACE [Transaction Marker Channel Manager 2]: Completed transaction for mu-data-generator-7 with coordinator epoch 375, final state after commit: CompleteCommit (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7 prepare transition from CompleteCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) for transaction id mu-data-generator-7 with coordinator epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:08:21,267] DEBUG TransactionalId mu-data-generator-7 complete transition from CompleteCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:08:21,267] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:26,828] DEBUG TransactionalId mu-data-generator-7 prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareEpochFence, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,829] DEBUG TransactionalId mu-data-generator-7 prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,829] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) for transaction id mu-data-generator-7 with coordinator epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7 complete transition from Ongoing to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,978] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort, topicPartitions=Set(__consumer_offsets-45), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7 prepare transition from PrepareAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:26,979] INFO [TransactionCoordinator id=2] Completed rollback ongoing transaction of transactionalId: mu-data-generator-7 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
[2021-01-04 08:09:26,979] TRACE [Transaction Marker Channel Manager 2]: Added marker TxnMarkerEntry{producerId=10450009, producerEpoch=6502, coordinatorEpoch=375, result=ABORT, partitions=[__consumer_offsets-45]} for transactional id mu-data-generator-7 to destination broker 2 (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:27,229] TRACE [Transaction Marker Channel Manager 2]: Completed sending transaction markers for mu-data-generator-7 as ABORT (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:27,229] DEBUG [Transaction Marker Channel Manager 2]: Sending mu-data-generator-7's transaction markers for TransactionMetadata(transactionalId=mu-data-generator-7, producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, state=PrepareAbort, pendingState=Some(CompleteAbort), topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) with coordinator epoch 375 succeeded, trying to append complete transaction log now (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:27,230] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) for transaction id mu-data-generator-7 with coordinator epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:27,453] DEBUG TransactionalId mu-data-generator-7 complete transition from PrepareAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:27,453] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:27,453] TRACE [Transaction Marker Channel Manager 2]: Completed transaction for mu-data-generator-7 with coordinator epoch 375, final state after commit: CompleteAbort (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-01-04 08:09:38,059] DEBUG TransactionalId mu-data-generator-7 prepare transition from CompleteAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6503, txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:38,060] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6503, txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) for transaction id mu-data-generator-7 with coordinator epoch 375 to the local transaction log (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:38,852] DEBUG TransactionalId mu-data-generator-7 complete transition from CompleteAbort to TxnTransitMetadata(producerId=10450009, producerEpoch=6503, txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) (kafka.coordinator.transaction.TransactionMetadata)
[2021-01-04 08:09:38,852] DEBUG [Transaction State Manager 2]: Updating mu-data-generator-7's transaction state to TxnTransitMetadata(producerId=10450009, producerEpoch=6503, txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) with coordinator epoch 375 for mu-data-generator-7 succeeded (kafka.coordinator.transaction.TransactionStateManager)
[2021-01-04 08:09:38,852] INFO [TransactionCoordinator id=2] Initialized transactionalId mu-data-generator-7 with producerId 10450009 and producer epoch 6503 on partition __transaction_state-29 (kafka.coordinator.transaction.TransactionCoo

Also this might be interesting:

[2021-01-04 08:08:21,097] TRACE [Kafka Request Handler 9 on Broker 2], Kafka request handler 9 on broker 2 handling request Request(processor=5, connectionId=10.48.111.102:9092-10.48.110.74:45394-99, session=Session(User:ANONYMOUS,/10.48.1
[2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7 prepare transition from CompleteCommit to TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_
[2021-01-04 08:08:21,097] TRACE [ReplicaManager broker=2] Append [Map(__transaction_state-29 -> [(record=DefaultRecord(offset=0, timestamp=1609747701097, key=23 bytes, value=65 bytes))])] to local log (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,097] TRACE Inserting 158 bytes at end offset 14721706 at position 23266940 with largest timestamp 1609747701097 at shallow offset 14721706 (kafka.log.LogSegment)
[2021-01-04 08:08:21,097] TRACE Appended 158 to /mnt/data/kafka/__transaction_state-29/00000000000014565986.log at end offset 14721706 (kafka.log.LogSegment)
[2021-01-04 08:08:21,097] TRACE [Log partition=__transaction_state-29, dir=/mnt/data/kafka] Appended message set with last offset: 14721706, first offset: Some(14721706), next offset: 14721707, and messages: [(record=DefaultRecord(offset=1
[2021-01-04 08:08:21,097] DEBUG [ReplicaManager broker=2] Request key __transaction_state-29 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,098] DEBUG [Partition __transaction_state-29 broker=2] Skipping update high watermark since new hw (offset=14721706 segment=[14565986:23266940]) is not larger than old hw (offset=14721706 segment=[14565986:23266940]).
[2021-01-04 08:08:21,098] TRACE [ReplicaManager broker=2] 158 written to log __transaction_state-29 beginning at offset 14721706 and ending at offset 14721706 (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,098] DEBUG [ReplicaManager broker=2] Produce to local log in 1 ms (kafka.server.ReplicaManager)
[2021-01-04 08:08:21,098] TRACE Initial partition status for __transaction_state-29 is [acksPending: true, error: 7, startOffset: 14721706, requiredOffset: 14721707] (kafka.server.DelayedProduce)
[2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for __transaction_state-29, current status [acksPending: true, error: 7, startOffset: 14721706, requiredOffset: 14721707] (kafka.server.DelayedProduce)
[2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29 broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition)
[2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for __transaction_state-29, current status [acksPending: true, error: 7, startOffset: 14721706, requiredOffset: 14721707] (kafka.server.DelayedProduce)
[2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29 broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition)
[2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_offsets-45), txnStartTim



Apart from upgrading the kafka version or upgrading to newer librdkafka when new confluent_kafka for python will come out, I am out of ideas of what to try.

I would be grateful for any suggestion about where to look to debug this kind of issue.

Re: Offset commit consistently timing out in transaction

Posted by Kamal Chandraprakash <ka...@gmail.com>.
Timeout in offset commit request has been fixed recently. This issue seems
to be more of KAFKA-8334 <https://issues.apache.org/jira/browse/KAFKA-8334>
.

On Mon, Jan 4, 2021 at 3:53 PM Kindernay Oliver
<ol...@eurowag.com.invalid> wrote:

> Hello,
>
> we are experiencing problems with offset commit timing out on brokers.
> This started to happen when we started using transactions.
> send_offsets_to_transactions periodically (every few minutes) times out. We
> have cluster of three brokers, and topics with 15 partitions. We use one
> transactional producer per partition and commit the transaction at least
> each second or more. The time out seemingly happens randomly, each time for
> a different producer instance and different broker.
>
>
>   1.  We used to get REQUEST_TIMED_OUT after 5 seconds. I understand that
> error came from a broker.
>   2.  We tried to raise offsets.commit.timeout.ms on broker to 60 seconds
>   3.  After the change, we are getting transaction operations timeout
> after 60s, with same periodicity. This is now a client error since the
> kafka broker takes the full minute, after which we would probably see the
> same error message from broker as previsously.
>      *   <class
> 'cimpl.KafkaException'>/KafkaError{code=_TIMED_OUT,val=-185,str="Transactional
> operation timed out"}
>      *   %5|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out
> TxnOffsetCommitRequest in flight (after 59943ms, timeout #0)
> %4|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out 1
> in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
> %7|1609747761.212|FAIL|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed
> out: disconnect (after 1544886ms in state UP) (_TIMED_OUT)
> %3|1609747761.212|FAIL|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed
> out: disconnect (after 1544886ms in state UP)
> %7|1609747761.212|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> UP -> DOWN
> %7|1609747761.212|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> DOWN -> INIT
> %7|1609747761.212|REQERR|rdkafka#producer-10| [thrd:main]:
> 10.48.111.102:9092/2: MetadataRequest failed: Local: Timed out: actions
> Retry
> %7|1609747761.312|STATE|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state
> INIT -> TRY_CONNECT
> %7|1609747761.312|RETRY|rdkafka#producer-10| [thrd:
> 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Moved 1 retry
> buffer(s) to output queue
>
>
>
>   1.  Broker system metrics (disk, network, CPU, memory) did not indicate
> any bottleneck, Anyway, we tried to upgrade the broker VMs to larger size,
> with no change to the behaviour.
>   2.  This happens on our test cluster, and on our prod cluster. It seems
> the frequency by which this happens in less on the test cluster (it has
> lower traffic and lower resources)
>   3.  We use python's confluent_kafka 1.5.0 - based on librdkafka 1.5.0
>   4.  Broker package version is confluent-kafka-2.11-2.0.1
>   5.  I enabled TRACE log level for everything on the test cluster
>
> This is a trace log from the broker. Client logs indicate that the
> timed-out operation started at 08:08:21:
>
> 2021-01-04 08:08:15,413] DEBUG TransactionalId mu-data-generator-7
> complete transition from PrepareCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(),
> txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:15,413] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(),
> txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:08:15,413] TRACE [Transaction Marker Channel Manager 2]:
> Completed transaction for mu-data-generator-7 with coordinator epoch 375,
> final state after commit: CompleteCommit
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7
> prepare transition from CompleteCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) for
> transaction id mu-data-generator-7 with coordinator epoch 375 to the local
> transaction log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:08:21,267] DEBUG TransactionalId mu-data-generator-7
> complete transition from CompleteCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:08:21,267] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:26,828] DEBUG TransactionalId mu-data-generator-7
> prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009,
> producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareEpochFence,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,829] DEBUG TransactionalId mu-data-generator-7
> prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009,
> producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,829] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) for
> transaction id mu-data-generator-7 with coordinator epoch 375 to the local
> transaction log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7
> complete transition from Ongoing to TxnTransitMetadata(producerId=10450009,
> producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,978] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=PrepareAbort,
> topicPartitions=Set(__consumer_offsets-45),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7
> prepare transition from PrepareAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:26,979] INFO [TransactionCoordinator id=2] Completed
> rollback ongoing transaction of transactionalId: mu-data-generator-7 due to
> timeout (kafka.coordinator.transaction.TransactionCoordinator)
> [2021-01-04 08:09:26,979] TRACE [Transaction Marker Channel Manager 2]:
> Added marker TxnMarkerEntry{producerId=10450009, producerEpoch=6502,
> coordinatorEpoch=375, result=ABORT, partitions=[__consumer_offsets-45]} for
> transactional id mu-data-generator-7 to destination broker 2
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:27,229] TRACE [Transaction Marker Channel Manager 2]:
> Completed sending transaction markers for mu-data-generator-7 as ABORT
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:27,229] DEBUG [Transaction Marker Channel Manager 2]:
> Sending mu-data-generator-7's transaction markers for
> TransactionMetadata(transactionalId=mu-data-generator-7,
> producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000,
> state=PrepareAbort, pendingState=Some(CompleteAbort),
> topicPartitions=Set(), txnStartTimestamp=1609747701097,
> txnLastUpdateTimestamp=1609747766829) with coordinator epoch 375 succeeded,
> trying to append complete transaction log now
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:27,230] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) for
> transaction id mu-data-generator-7 with coordinator epoch 375 to the local
> transaction log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:27,453] DEBUG TransactionalId mu-data-generator-7
> complete transition from PrepareAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:27,453] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6502,
> txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(),
> txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:27,453] TRACE [Transaction Marker Channel Manager 2]:
> Completed transaction for mu-data-generator-7 with coordinator epoch 375,
> final state after commit: CompleteAbort
> (kafka.coordinator.transaction.TransactionMarkerChannelManager)
> [2021-01-04 08:09:38,059] DEBUG TransactionalId mu-data-generator-7
> prepare transition from CompleteAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:38,060] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) for transaction
> id mu-data-generator-7 with coordinator epoch 375 to the local transaction
> log (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:38,852] DEBUG TransactionalId mu-data-generator-7
> complete transition from CompleteAbort to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059)
> (kafka.coordinator.transaction.TransactionMetadata)
> [2021-01-04 08:09:38,852] DEBUG [Transaction State Manager 2]: Updating
> mu-data-generator-7's transaction state to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6503,
> txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) with
> coordinator epoch 375 for mu-data-generator-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2021-01-04 08:09:38,852] INFO [TransactionCoordinator id=2] Initialized
> transactionalId mu-data-generator-7 with producerId 10450009 and producer
> epoch 6503 on partition __transaction_state-29
> (kafka.coordinator.transaction.TransactionCoo
>
> Also this might be interesting:
>
> [2021-01-04 08:08:21,097] TRACE [Kafka Request Handler 9 on Broker 2],
> Kafka request handler 9 on broker 2 handling request Request(processor=5,
> connectionId=10.48.111.102:9092-10.48.110.74:45394-99,
> session=Session(User:ANONYMOUS,/10.48.1
> [2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7
> prepare transition from CompleteCommit to
> TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_
> [2021-01-04 08:08:21,097] TRACE [ReplicaManager broker=2] Append
> [Map(__transaction_state-29 -> [(record=DefaultRecord(offset=0,
> timestamp=1609747701097, key=23 bytes, value=65 bytes))])] to local log
> (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,097] TRACE Inserting 158 bytes at end offset 14721706
> at position 23266940 with largest timestamp 1609747701097 at shallow offset
> 14721706 (kafka.log.LogSegment)
> [2021-01-04 08:08:21,097] TRACE Appended 158 to
> /mnt/data/kafka/__transaction_state-29/00000000000014565986.log at end
> offset 14721706 (kafka.log.LogSegment)
> [2021-01-04 08:08:21,097] TRACE [Log partition=__transaction_state-29,
> dir=/mnt/data/kafka] Appended message set with last offset: 14721706, first
> offset: Some(14721706), next offset: 14721707, and messages:
> [(record=DefaultRecord(offset=1
> [2021-01-04 08:08:21,097] DEBUG [ReplicaManager broker=2] Request key
> __transaction_state-29 unblocked 0 fetch requests.
> (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,098] DEBUG [Partition __transaction_state-29
> broker=2] Skipping update high watermark since new hw (offset=14721706
> segment=[14565986:23266940]) is not larger than old hw (offset=14721706
> segment=[14565986:23266940]).
> [2021-01-04 08:08:21,098] TRACE [ReplicaManager broker=2] 158 written to
> log __transaction_state-29 beginning at offset 14721706 and ending at
> offset 14721706 (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,098] DEBUG [ReplicaManager broker=2] Produce to local
> log in 1 ms (kafka.server.ReplicaManager)
> [2021-01-04 08:08:21,098] TRACE Initial partition status for
> __transaction_state-29 is [acksPending: true, error: 7, startOffset:
> 14721706, requiredOffset: 14721707] (kafka.server.DelayedProduce)
> [2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for
> __transaction_state-29, current status [acksPending: true, error: 7,
> startOffset: 14721706, requiredOffset: 14721707]
> (kafka.server.DelayedProduce)
> [2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29
> broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker
> 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition)
> [2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for
> __transaction_state-29, current status [acksPending: true, error: 7,
> startOffset: 14721706, requiredOffset: 14721707]
> (kafka.server.DelayedProduce)
> [2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29
> broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker
> 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition)
> [2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending
> new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501,
> txnTimeoutMs=60000, txnState=Ongoing,
> topicPartitions=Set(__consumer_offsets-45), txnStartTim
>
>
>
> Apart from upgrading the kafka version or upgrading to newer librdkafka
> when new confluent_kafka for python will come out, I am out of ideas of
> what to try.
>
> I would be grateful for any suggestion about where to look to debug this
> kind of issue.
>