You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Apurva Mehta (JIRA)" <ji...@apache.org> on 2017/06/06 06:12:18 UTC

[jira] [Updated] (KAFKA-5385) Transactional Producer allows batches to expire and commits transactions regardless

     [ https://issues.apache.org/jira/browse/KAFKA-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apurva Mehta updated KAFKA-5385:
--------------------------------
    Description: 
The transactions system test has revealed a data loss issue. When there is cluster instability, it can happen that the transactional requests (AddPartitions, and AddOffsets) can retry for a long time. When they eventually succeed, the commit message will be dequeued, at which point we will try to drain the accumulator. However, we would find the batches should be expired, and just drop them, but commit the transaction anyway. This causes data loss. 

Relevant portion from the producer log is here: 

{noformat}
[2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION (org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Enqueuing transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator (org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition output-topic-0 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-0: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition output-topic-1 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-1: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition output-topic-2 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-2: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id] Request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending (org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id] Sending transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) to node knode04:9092 (id: 3 rack: null) (org.apache.kafka.clients.producer.internals.Sender)
[2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id] Received transactional response EndTxnResponse(error=NOT_COORDINATOR, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
{noformat}

As you can see, the commit goes ahead even though the batches are never sent. In this test, we lost 750 messages in the output topic, and they correspond exactly with the 750 messages in the input topic at the offset in this portion of the log.

The solution is to either never expire transactional batches, or fail the transaction if any batches have expired. 



  was:
The transactions system test has revealed a data loss issue. When there is cluster instability, it can happen that the transactional requests (AddPartitions, and AddOffsets) can retry for a long time. When they eventually succeed, the commit message will be dequeued, at which point we will try to drain the accumulator. However, we would find the batches should be expired, and just drop them, but commit the transaction anyway. This causes data loss. 

Relevant portion from the producer log is here: 

{noformat}
[2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION (org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Enqueuing transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator (org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition output-topic-0 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-0: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition output-topic-1 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-1: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition output-topic-2 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-2: 39080 ms has passed since batch creation plus linger time
[2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id] Request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending (org.apache.kafka.clients.producer.internals.TransactionManager)
[2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id] Sending transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) to node knode04:9092 (id: 3 rack: null) (org.apache.kafka.clients.producer.internals.Sender)
[2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id] Received transactional response EndTxnResponse(error=NOT_COORDINATOR, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
{noformat}

As you can see, the commit goes ahead even though the batches are never sent. We are missing 750 messages in the output topic, and they correspond exactly with the 750 messages in the input topic at the offset in this portion of the log.

The solution is to either never expire transactional batches, or fail the transaction if any batches have expired. 




> Transactional Producer allows batches to expire and commits transactions regardless
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-5385
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5385
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>            Reporter: Apurva Mehta
>            Priority: Blocker
>             Fix For: 0.11.0.0
>
>
> The transactions system test has revealed a data loss issue. When there is cluster instability, it can happen that the transactional requests (AddPartitions, and AddOffsets) can retry for a long time. When they eventually succeed, the commit message will be dequeued, at which point we will try to drain the accumulator. However, we would find the batches should be expired, and just drop them, but commit the transaction anyway. This causes data loss. 
> Relevant portion from the producer log is here: 
> {noformat}
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Enqueuing transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition output-topic-0 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-0: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition output-topic-1 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-1: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition output-topic-2 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-2: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id] Request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id] Sending transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) to node knode04:9092 (id: 3 rack: null) (org.apache.kafka.clients.producer.internals.Sender)
> [2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id] Received transactional response EndTxnResponse(error=NOT_COORDINATOR, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
> {noformat}
> As you can see, the commit goes ahead even though the batches are never sent. In this test, we lost 750 messages in the output topic, and they correspond exactly with the 750 messages in the input topic at the offset in this portion of the log.
> The solution is to either never expire transactional batches, or fail the transaction if any batches have expired. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)