You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2021/01/29 15:35:00 UTC

[jira] [Resolved] (KAFKA-6876) Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss

     [ https://issues.apache.org/jira/browse/KAFKA-6876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Egerton resolved KAFKA-6876.
----------------------------------
    Resolution: Duplicate

> Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-6876
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6876
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.1, 1.1.0, 2.0.1
>         Environment: Linux, JDK 8
>            Reporter: Paul Davidson
>            Priority: Major
>
> The producer callback in "WorkerSourceTask" handles exceptions during a send() by logging at ERROR level and continuing.  This can lead to offsets being committed for records that were never sent correctly.  The records are effectively skipped, leading to data loss in our use case.   
> The source code for the Callback "onCompletion()" method suggests this should "basically never happen ... callbacks with exceptions should never be invoked in practice", but we have seen this happen several times in production, especially in near heap-exhaustion situations when the Sender thread generates an exception (often caused by KAFKA-6551).
> From WorkerSourceTask line 253:
> {code:java}
> new Callback() {
>    @Override
>    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
>        if (e != null) {
>            // Given the default settings for zero data loss, this should basically never happen --
>            // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request
>            // timeouts, callbacks with exceptions should never be invoked in practice. If the
>            // user overrode these settings, the best we can do is notify them of the failure via
>            // logging.
>            log.error("{} failed to send record to {}: {}", this, topic, e);
>            log.debug("{} Failed record: {}", this, preTransformRecord);
>        } else {
>           log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
>                    this,
>                    recordMetadata.topic(), recordMetadata.partition(),
>                    recordMetadata.offset());
>            commitTaskRecord(preTransformRecord);
>        }
>        recordSent(producerRecord);
>        counter.completeRecord();
>    }
> }
> {code}
>  
> Example of an exception triggering the bug:
> {code:java}
> 2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to topic-name: {}
> java.lang.IllegalStateException: Producer is closed forcefully.
>         at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
>         at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
>         at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)