You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2021/11/16 21:16:00 UTC

[jira] [Assigned] (KAFKA-9279) Silent data loss in Kafka producer

     [ https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Egerton reassigned KAFKA-9279:
------------------------------------

    Assignee: Chris Egerton

> Silent data loss in Kafka producer
> ----------------------------------
>
>                 Key: KAFKA-9279
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9279
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.3.0
>            Reporter: Andrew Klopper
>            Assignee: Chris Egerton
>            Priority: Major
>
> It appears that it is possible for a producer.commitTransaction() call to succeed even if an individual producer.send() call has failed. The following code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
>     public static void main(final String[] args) {
>         final Properties producerProps = new Properties();
>         if (args.length != 2) {
>             System.err.println("Invalid command-line arguments");
>             System.exit(1);
>         }
>         final String bootstrapServer = args[0];
>         final String topic = args[1];
>         producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
>         producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000");
>         producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
>         producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1000000");
>         producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>         producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "dataloss_01");
>         producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dataloss_01");
>         try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {
>             producer.initTransactions();
>             producer.beginTransaction();
>             final Random random = new Random();
>             final byte[] largePayload = new byte[2000000];
>             random.nextBytes(largePayload);
>             producer.send(
>                 new ProducerRecord<>(
>                     topic,
>                     "large".getBytes(StandardCharsets.UTF_8),
>                     largePayload
>                 ),
>                 (metadata, e) -> {
>                     if (e == null) {
>                         System.out.println("INFO: Large payload succeeded");
>                     } else {
>                         System.err.printf("ERROR: Large payload failed: %s\n", e.getMessage());
>                     }
>                 }
>             );
>             producer.commitTransaction();
>             System.out.println("Commit succeeded");
>         } catch (final Exception e) {
>             System.err.printf("FATAL ERROR: %s", e.getMessage());
>         }
>     }
> }
> {code}
> The code prints the following output:
> {code:java}
> ERROR: Large payload failed: The message is 2000093 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
> Commit succeeded{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)