You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2021/11/16 21:16:00 UTC
[jira] [Assigned] (KAFKA-9279) Silent data loss in Kafka producer
[ https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton reassigned KAFKA-9279:
------------------------------------
Assignee: Chris Egerton
> Silent data loss in Kafka producer
> ----------------------------------
>
> Key: KAFKA-9279
> URL: https://issues.apache.org/jira/browse/KAFKA-9279
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 2.3.0
> Reporter: Andrew Klopper
> Assignee: Chris Egerton
> Priority: Major
>
> It appears that it is possible for a producer.commitTransaction() call to succeed even if an individual producer.send() call has failed. The following code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
> public static void main(final String[] args) {
> final Properties producerProps = new Properties();
> if (args.length != 2) {
> System.err.println("Invalid command-line arguments");
> System.exit(1);
> }
> final String bootstrapServer = args[0];
> final String topic = args[1];
> producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
> producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000");
> producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1000000");
> producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "dataloss_01");
> producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dataloss_01");
> try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {
> producer.initTransactions();
> producer.beginTransaction();
> final Random random = new Random();
> final byte[] largePayload = new byte[2000000];
> random.nextBytes(largePayload);
> producer.send(
> new ProducerRecord<>(
> topic,
> "large".getBytes(StandardCharsets.UTF_8),
> largePayload
> ),
> (metadata, e) -> {
> if (e == null) {
> System.out.println("INFO: Large payload succeeded");
> } else {
> System.err.printf("ERROR: Large payload failed: %s\n", e.getMessage());
> }
> }
> );
> producer.commitTransaction();
> System.out.println("Commit succeeded");
> } catch (final Exception e) {
> System.err.printf("FATAL ERROR: %s", e.getMessage());
> }
> }
> }
> {code}
> The code prints the following output:
> {code:java}
> ERROR: Large payload failed: The message is 2000093 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
> Commit succeeded{code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)