You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/06/07 05:05:18 UTC

[jira] [Commented] (KAFKA-5385) Transactional Producer allows batches to expire and commits transactions regardless

    [ https://issues.apache.org/jira/browse/KAFKA-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040179#comment-16040179 ] 

ASF GitHub Bot commented on KAFKA-5385:
---------------------------------------

GitHub user apurvam opened a pull request:

    https://github.com/apache/kafka/pull/3252

    KAFKA-5385: ProducerBatch expiry should go through Sender.failBatch

    Before this patch, we would call `producerBatch.done` directly from the accumulator when expiring batches. This meant that we would not transition to the `ABORTABLE_ERROR` state in the transaction manager, allowing other transactional requests (including Commits!) to go through, even the produce failed. 
    
    This patch modifies the logic so that we call `Sender.failBatch` on every expired batch, thus ensuring that the transaction state is accurate. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apurvam/kafka KAFKA-5385-fail-transaction-if-batches-expire

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3252
    
----
commit 20768261ad83c8ce13ab135d22907d3f35013e34
Author: Apurva Mehta <ap...@confluent.io>
Date:   2017-06-06T22:33:33Z

    WIP

commit 3b50c5ed56cb696c708c59676f818f4bc0a3a3be
Author: Apurva Mehta <ap...@confluent.io>
Date:   2017-06-07T04:50:44Z

    Batch expiry should go through Sender.failBatch so that the transactional state is set correctly

----


> Transactional Producer allows batches to expire and commits transactions regardless
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-5385
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5385
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>            Reporter: Apurva Mehta
>            Assignee: Apurva Mehta
>            Priority: Blocker
>              Labels: exactly-once
>             Fix For: 0.11.0.0
>
>
> The transactions system test has revealed a data loss issue. When there is cluster instability, it can happen that the transactional requests (AddPartitions, and AddOffsets) can retry for a long time. When they eventually succeed, the commit message will be dequeued, at which point we will try to drain the accumulator. However, we would find the batches should be expired, and just drop them, but commit the transaction anyway. This causes data loss. 
> Relevant portion from the producer log is here: 
> {noformat}
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] Enqueuing transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition output-topic-0 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-0: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition output-topic-1 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-1: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition output-topic-2 with base offset offset -1 and error: {}. (org.apache.kafka.clients.producer.internals.ProducerBatch)
> org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for output-topic-2: 39080 ms has passed since batch creation plus linger time
> [2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id] Request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending (org.apache.kafka.clients.producer.internals.TransactionManager)
> [2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id] Sending transactional request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) to node knode04:9092 (id: 3 rack: null) (org.apache.kafka.clients.producer.internals.Sender)
> [2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id] Received transactional response EndTxnResponse(error=NOT_COORDINATOR, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, result=COMMIT) (org.apache.kafka.clients.producer.internals.TransactionManager)
> {noformat}
> As you can see, the commit goes ahead even though the batches are never sent. In this test, we lost 750 messages in the output topic, and they correspond exactly with the 750 messages in the input topic at the offset in this portion of the log.
> The solution is to either never expire transactional batches, or fail the transaction if any batches have expired. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)