You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Phadnis, Varun" <ph...@sky.optymyze.com> on 2016/11/22 07:15:54 UTC

Kafka producer dropping records

Hello,

We have the following piece of code where we read lines from a file and push them to a Kafka topic :

        Properties properties = new Properties();
        properties.put("bootstrap.servers", <bootstrapServers>);
        properties.put("key.serializer", StringSerializer.class.getCanonicalName());
        properties.put("value.serializer", StringSerializer.class.getCanonicalName());
        properties.put("retries",100);
       properties.put("acks", "all");

       KafkaProducer<Object, String> producer =  new KafkaProducer<>(properties);

        try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
            String line;
            int count = 0;
            while ((line = bf.readLine()) != null) {
                count++;
                producer.send(new ProducerRecord<>(topicName, line));
            }
            Logger.log("Done producing data messages. Total no of records produced:" + count);
        } catch (InterruptedException | ExecutionException | IOException e) {
            Throwables.propagate(e);
        } finally {
            producer.close();
        }

When we try this with a large file with a million records, only half of them around 500,000 get written to the topic. In the above example, I verified this by running the GetOffset tool after fair amount of time (to ensure all records had finished processing) as follows:


        ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> --time -1 --topic <topic_name>




The output of this was :


        topic_name:1:292954

        topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun

Re: Kafka producer dropping records

Posted by Jaikiran Pai <ja...@gmail.com>.
That tells you that the acknowledgements (which in your case, you have 
set to receive ACKs from all brokers in the ISR) aren't happening and 
that can essentially mean that the records aren't making it to the 
topics. How many brokers do you have? What's the replication factor on 
the topic and what are the ISR brokers for the topic? Ultimately, you 
have to figure out why these ACKs aren't happening.

-Jaikiran

On Tuesday 22 November 2016 02:26 PM, Phadnis, Varun wrote:
> Hello,
>
> We had tried that... If future.get() is added in the while loop, it takes too long for the loop to execute.
>
> Last time we tried it, it was running for that file for over 2 hours and still not finished.
>
> Regards,
> Varun
>
> -----Original Message-----
> From: Jaikiran Pai [mailto:jai.forums2013@gmail.com]
> Sent: 22 November 2016 02:20
> To: users@kafka.apache.org
> Subject: Re: Kafka producer dropping records
>
> The KafkaProducer.send returns a Future<RecordMetadata>. What happens when you add a future.get() on the returned Future, in that while loop, for each sent record?
>
> -Jaikiran
>
> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
>> Hello,
>>
>> We have the following piece of code where we read lines from a file and push them to a Kafka topic :
>>
>>           Properties properties = new Properties();
>>           properties.put("bootstrap.servers", <bootstrapServers>);
>>           properties.put("key.serializer", StringSerializer.class.getCanonicalName());
>>           properties.put("value.serializer", StringSerializer.class.getCanonicalName());
>>           properties.put("retries",100);
>>          properties.put("acks", "all");
>>
>>          KafkaProducer<Object, String> producer =  new
>> KafkaProducer<>(properties);
>>
>>           try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>>               String line;
>>               int count = 0;
>>               while ((line = bf.readLine()) != null) {
>>                   count++;
>>                   producer.send(new ProducerRecord<>(topicName, line));
>>               }
>>               Logger.log("Done producing data messages. Total no of records produced:" + count);
>>           } catch (InterruptedException | ExecutionException | IOException e) {
>>               Throwables.propagate(e);
>>           } finally {
>>               producer.close();
>>           }
>>
>> When we try this with a large file with a million records, only half of them around 500,000 get written to the topic. In the above example, I verified this by running the GetOffset tool after fair amount of time (to ensure all records had finished processing) as follows:
>>
>>
>>           ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
>> <broker_list> --time -1 --topic <topic_name>
>>
>>
>>
>>
>> The output of this was :
>>
>>
>>           topic_name:1:292954
>>
>>           topic_name:0:296787
>>
>>
>> What could be causing this dropping of records?
>>
>> Thanks,
>> Varun
>>


Re: Kafka producer dropping records

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Varun,

You could increase `retries`, but seems like you already configured it to
be `100`. Another option is to increase `retry.backoff.ms` which will
increase the time between retries.

Ismael

On Fri, Nov 25, 2016 at 9:38 AM, Phadnis, Varun <ph...@sky.optymyze.com>
wrote:

> Hello,
>
> Sorry for the late response, we tried logging the errors received in the
> callback and the result is that we are facing TimeoutExceptions
>
>         org.apache.kafka.common.errors.TimeoutException: Batch containing
> 93 record(s) expired due to timeout while requesting metadata from brokers
> for mp_test2-1
>
> Increasing the request.timeout.ms=100000 (from default of 30000) fixed
> the messages from being dropped. However that seems like solution which
> would not scale if there was a unpredictable "burst" of slowness in network
> causing longer delay.
>
> Is there a better way to handle this? Is there any other producer/broker
> configuration I could tweak to increase the reliability of the producer?
>
> Thanks,
> Varun
>
> -----Original Message-----
> From: ismaelj@gmail.com [mailto:ismaelj@gmail.com] On Behalf Of Ismael
> Juma
> Sent: 22 November 2016 08:31
> To: Kafka Users <us...@kafka.apache.org>
> Subject: Re: Kafka producer dropping records
>
> Another option which is probably easier is to pass a callback to `send`
> and log errors.
>
> Ismael
>
> On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <is...@juma.me.uk> wrote:
>
> > You can collect the Futures and call `get` in batches. That would give
> > you access to the errors without blocking on each request.
> >
> > Ismael
> >
> >
> > On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun
> > <ph...@sky.optymyze.com>
> > wrote:
> >
> >> Hello,
> >>
> >> We had tried that... If future.get() is added in the while loop, it
> >> takes too long for the loop to execute.
> >>
> >> Last time we tried it, it was running for that file for over 2 hours
> >> and still not finished.
> >>
> >> Regards,
> >> Varun
> >>
> >> -----Original Message-----
> >> From: Jaikiran Pai [mailto:jai.forums2013@gmail.com]
> >> Sent: 22 November 2016 02:20
> >> To: users@kafka.apache.org
> >> Subject: Re: Kafka producer dropping records
> >>
> >> The KafkaProducer.send returns a Future<RecordMetadata>. What happens
> >> when you add a future.get() on the returned Future, in that while
> >> loop, for each sent record?
> >>
> >> -Jaikiran
> >>
> >> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> >> > Hello,
> >> >
> >> > We have the following piece of code where we read lines from a file
> >> > and
> >> push them to a Kafka topic :
> >> >
> >> >          Properties properties = new Properties();
> >> >          properties.put("bootstrap.servers", <bootstrapServers>);
> >> >          properties.put("key.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >          properties.put("value.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >          properties.put("retries",100);
> >> >         properties.put("acks", "all");
> >> >
> >> >         KafkaProducer<Object, String> producer =  new
> >> > KafkaProducer<>(properties);
> >> >
> >> >          try (BufferedReader bf = new BufferedReader(new
> >> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
> >> >              String line;
> >> >              int count = 0;
> >> >              while ((line = bf.readLine()) != null) {
> >> >                  count++;
> >> >                  producer.send(new ProducerRecord<>(topicName, line));
> >> >              }
> >> >              Logger.log("Done producing data messages. Total no of
> >> records produced:" + count);
> >> >          } catch (InterruptedException | ExecutionException |
> >> IOException e) {
> >> >              Throwables.propagate(e);
> >> >          } finally {
> >> >              producer.close();
> >> >          }
> >> >
> >> > When we try this with a large file with a million records, only
> >> > half of
> >> them around 500,000 get written to the topic. In the above example, I
> >> verified this by running the GetOffset tool after fair amount of time
> >> (to ensure all records had finished processing) as follows:
> >> >
> >> >
> >> >          ./kafka-run-class.sh kafka.tools.GetOffsetShell
> >> > --broker-list <broker_list> --time -1 --topic <topic_name>
> >> >
> >> >
> >> >
> >> >
> >> > The output of this was :
> >> >
> >> >
> >> >          topic_name:1:292954
> >> >
> >> >          topic_name:0:296787
> >> >
> >> >
> >> > What could be causing this dropping of records?
> >> >
> >> > Thanks,
> >> > Varun
> >> >
> >>
> >>
> >
>

RE: Kafka producer dropping records

Posted by "Phadnis, Varun" <ph...@sky.optymyze.com>.
Hello,

Sorry for the late response, we tried logging the errors received in the callback and the result is that we are facing TimeoutExceptions

	org.apache.kafka.common.errors.TimeoutException: Batch containing 93 record(s) expired due to timeout while requesting metadata from brokers for mp_test2-1

Increasing the request.timeout.ms=100000 (from default of 30000) fixed the messages from being dropped. However that seems like solution which would not scale if there was a unpredictable "burst" of slowness in network causing longer delay. 

Is there a better way to handle this? Is there any other producer/broker configuration I could tweak to increase the reliability of the producer? 

Thanks,
Varun

-----Original Message-----
From: ismaelj@gmail.com [mailto:ismaelj@gmail.com] On Behalf Of Ismael Juma
Sent: 22 November 2016 08:31
To: Kafka Users <us...@kafka.apache.org>
Subject: Re: Kafka producer dropping records

Another option which is probably easier is to pass a callback to `send` and log errors.

Ismael

On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <is...@juma.me.uk> wrote:

> You can collect the Futures and call `get` in batches. That would give 
> you access to the errors without blocking on each request.
>
> Ismael
>
>
> On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun 
> <ph...@sky.optymyze.com>
> wrote:
>
>> Hello,
>>
>> We had tried that... If future.get() is added in the while loop, it 
>> takes too long for the loop to execute.
>>
>> Last time we tried it, it was running for that file for over 2 hours 
>> and still not finished.
>>
>> Regards,
>> Varun
>>
>> -----Original Message-----
>> From: Jaikiran Pai [mailto:jai.forums2013@gmail.com]
>> Sent: 22 November 2016 02:20
>> To: users@kafka.apache.org
>> Subject: Re: Kafka producer dropping records
>>
>> The KafkaProducer.send returns a Future<RecordMetadata>. What happens 
>> when you add a future.get() on the returned Future, in that while 
>> loop, for each sent record?
>>
>> -Jaikiran
>>
>> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
>> > Hello,
>> >
>> > We have the following piece of code where we read lines from a file 
>> > and
>> push them to a Kafka topic :
>> >
>> >          Properties properties = new Properties();
>> >          properties.put("bootstrap.servers", <bootstrapServers>);
>> >          properties.put("key.serializer",
>> StringSerializer.class.getCanonicalName());
>> >          properties.put("value.serializer",
>> StringSerializer.class.getCanonicalName());
>> >          properties.put("retries",100);
>> >         properties.put("acks", "all");
>> >
>> >         KafkaProducer<Object, String> producer =  new 
>> > KafkaProducer<>(properties);
>> >
>> >          try (BufferedReader bf = new BufferedReader(new
>> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>> >              String line;
>> >              int count = 0;
>> >              while ((line = bf.readLine()) != null) {
>> >                  count++;
>> >                  producer.send(new ProducerRecord<>(topicName, line));
>> >              }
>> >              Logger.log("Done producing data messages. Total no of
>> records produced:" + count);
>> >          } catch (InterruptedException | ExecutionException |
>> IOException e) {
>> >              Throwables.propagate(e);
>> >          } finally {
>> >              producer.close();
>> >          }
>> >
>> > When we try this with a large file with a million records, only 
>> > half of
>> them around 500,000 get written to the topic. In the above example, I 
>> verified this by running the GetOffset tool after fair amount of time 
>> (to ensure all records had finished processing) as follows:
>> >
>> >
>> >          ./kafka-run-class.sh kafka.tools.GetOffsetShell 
>> > --broker-list <broker_list> --time -1 --topic <topic_name>
>> >
>> >
>> >
>> >
>> > The output of this was :
>> >
>> >
>> >          topic_name:1:292954
>> >
>> >          topic_name:0:296787
>> >
>> >
>> > What could be causing this dropping of records?
>> >
>> > Thanks,
>> > Varun
>> >
>>
>>
>

Re: Kafka producer dropping records

Posted by Ismael Juma <is...@juma.me.uk>.
Another option which is probably easier is to pass a callback to `send` and
log errors.

Ismael

On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <is...@juma.me.uk> wrote:

> You can collect the Futures and call `get` in batches. That would give you
> access to the errors without blocking on each request.
>
> Ismael
>
>
> On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun <ph...@sky.optymyze.com>
> wrote:
>
>> Hello,
>>
>> We had tried that... If future.get() is added in the while loop, it takes
>> too long for the loop to execute.
>>
>> Last time we tried it, it was running for that file for over 2 hours and
>> still not finished.
>>
>> Regards,
>> Varun
>>
>> -----Original Message-----
>> From: Jaikiran Pai [mailto:jai.forums2013@gmail.com]
>> Sent: 22 November 2016 02:20
>> To: users@kafka.apache.org
>> Subject: Re: Kafka producer dropping records
>>
>> The KafkaProducer.send returns a Future<RecordMetadata>. What happens
>> when you add a future.get() on the returned Future, in that while loop, for
>> each sent record?
>>
>> -Jaikiran
>>
>> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
>> > Hello,
>> >
>> > We have the following piece of code where we read lines from a file and
>> push them to a Kafka topic :
>> >
>> >          Properties properties = new Properties();
>> >          properties.put("bootstrap.servers", <bootstrapServers>);
>> >          properties.put("key.serializer",
>> StringSerializer.class.getCanonicalName());
>> >          properties.put("value.serializer",
>> StringSerializer.class.getCanonicalName());
>> >          properties.put("retries",100);
>> >         properties.put("acks", "all");
>> >
>> >         KafkaProducer<Object, String> producer =  new
>> > KafkaProducer<>(properties);
>> >
>> >          try (BufferedReader bf = new BufferedReader(new
>> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>> >              String line;
>> >              int count = 0;
>> >              while ((line = bf.readLine()) != null) {
>> >                  count++;
>> >                  producer.send(new ProducerRecord<>(topicName, line));
>> >              }
>> >              Logger.log("Done producing data messages. Total no of
>> records produced:" + count);
>> >          } catch (InterruptedException | ExecutionException |
>> IOException e) {
>> >              Throwables.propagate(e);
>> >          } finally {
>> >              producer.close();
>> >          }
>> >
>> > When we try this with a large file with a million records, only half of
>> them around 500,000 get written to the topic. In the above example, I
>> verified this by running the GetOffset tool after fair amount of time (to
>> ensure all records had finished processing) as follows:
>> >
>> >
>> >          ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
>> > <broker_list> --time -1 --topic <topic_name>
>> >
>> >
>> >
>> >
>> > The output of this was :
>> >
>> >
>> >          topic_name:1:292954
>> >
>> >          topic_name:0:296787
>> >
>> >
>> > What could be causing this dropping of records?
>> >
>> > Thanks,
>> > Varun
>> >
>>
>>
>

Re: Kafka producer dropping records

Posted by Ismael Juma <is...@juma.me.uk>.
You can collect the Futures and call `get` in batches. That would give you
access to the errors without blocking on each request.

Ismael

On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun <ph...@sky.optymyze.com>
wrote:

> Hello,
>
> We had tried that... If future.get() is added in the while loop, it takes
> too long for the loop to execute.
>
> Last time we tried it, it was running for that file for over 2 hours and
> still not finished.
>
> Regards,
> Varun
>
> -----Original Message-----
> From: Jaikiran Pai [mailto:jai.forums2013@gmail.com]
> Sent: 22 November 2016 02:20
> To: users@kafka.apache.org
> Subject: Re: Kafka producer dropping records
>
> The KafkaProducer.send returns a Future<RecordMetadata>. What happens when
> you add a future.get() on the returned Future, in that while loop, for each
> sent record?
>
> -Jaikiran
>
> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> > Hello,
> >
> > We have the following piece of code where we read lines from a file and
> push them to a Kafka topic :
> >
> >          Properties properties = new Properties();
> >          properties.put("bootstrap.servers", <bootstrapServers>);
> >          properties.put("key.serializer", StringSerializer.class.getCano
> nicalName());
> >          properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
> >          properties.put("retries",100);
> >         properties.put("acks", "all");
> >
> >         KafkaProducer<Object, String> producer =  new
> > KafkaProducer<>(properties);
> >
> >          try (BufferedReader bf = new BufferedReader(new
> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
> >              String line;
> >              int count = 0;
> >              while ((line = bf.readLine()) != null) {
> >                  count++;
> >                  producer.send(new ProducerRecord<>(topicName, line));
> >              }
> >              Logger.log("Done producing data messages. Total no of
> records produced:" + count);
> >          } catch (InterruptedException | ExecutionException |
> IOException e) {
> >              Throwables.propagate(e);
> >          } finally {
> >              producer.close();
> >          }
> >
> > When we try this with a large file with a million records, only half of
> them around 500,000 get written to the topic. In the above example, I
> verified this by running the GetOffset tool after fair amount of time (to
> ensure all records had finished processing) as follows:
> >
> >
> >          ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> > <broker_list> --time -1 --topic <topic_name>
> >
> >
> >
> >
> > The output of this was :
> >
> >
> >          topic_name:1:292954
> >
> >          topic_name:0:296787
> >
> >
> > What could be causing this dropping of records?
> >
> > Thanks,
> > Varun
> >
>
>

RE: Kafka producer dropping records

Posted by "Phadnis, Varun" <ph...@sky.optymyze.com>.
Hello,

We had tried that... If future.get() is added in the while loop, it takes too long for the loop to execute. 

Last time we tried it, it was running for that file for over 2 hours and still not finished.

Regards,
Varun

-----Original Message-----
From: Jaikiran Pai [mailto:jai.forums2013@gmail.com] 
Sent: 22 November 2016 02:20
To: users@kafka.apache.org
Subject: Re: Kafka producer dropping records

The KafkaProducer.send returns a Future<RecordMetadata>. What happens when you add a future.get() on the returned Future, in that while loop, for each sent record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> Hello,
>
> We have the following piece of code where we read lines from a file and push them to a Kafka topic :
>
>          Properties properties = new Properties();
>          properties.put("bootstrap.servers", <bootstrapServers>);
>          properties.put("key.serializer", StringSerializer.class.getCanonicalName());
>          properties.put("value.serializer", StringSerializer.class.getCanonicalName());
>          properties.put("retries",100);
>         properties.put("acks", "all");
>
>         KafkaProducer<Object, String> producer =  new 
> KafkaProducer<>(properties);
>
>          try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>              String line;
>              int count = 0;
>              while ((line = bf.readLine()) != null) {
>                  count++;
>                  producer.send(new ProducerRecord<>(topicName, line));
>              }
>              Logger.log("Done producing data messages. Total no of records produced:" + count);
>          } catch (InterruptedException | ExecutionException | IOException e) {
>              Throwables.propagate(e);
>          } finally {
>              producer.close();
>          }
>
> When we try this with a large file with a million records, only half of them around 500,000 get written to the topic. In the above example, I verified this by running the GetOffset tool after fair amount of time (to ensure all records had finished processing) as follows:
>
>
>          ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> <broker_list> --time -1 --topic <topic_name>
>
>
>
>
> The output of this was :
>
>
>          topic_name:1:292954
>
>          topic_name:0:296787
>
>
> What could be causing this dropping of records?
>
> Thanks,
> Varun
>


Re: Kafka producer dropping records

Posted by Jaikiran Pai <ja...@gmail.com>.
The KafkaProducer.send returns a Future<RecordMetadata>. What happens 
when you add a future.get() on the returned Future, in that while loop, 
for each sent record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> Hello,
>
> We have the following piece of code where we read lines from a file and push them to a Kafka topic :
>
>          Properties properties = new Properties();
>          properties.put("bootstrap.servers", <bootstrapServers>);
>          properties.put("key.serializer", StringSerializer.class.getCanonicalName());
>          properties.put("value.serializer", StringSerializer.class.getCanonicalName());
>          properties.put("retries",100);
>         properties.put("acks", "all");
>
>         KafkaProducer<Object, String> producer =  new KafkaProducer<>(properties);
>
>          try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>              String line;
>              int count = 0;
>              while ((line = bf.readLine()) != null) {
>                  count++;
>                  producer.send(new ProducerRecord<>(topicName, line));
>              }
>              Logger.log("Done producing data messages. Total no of records produced:" + count);
>          } catch (InterruptedException | ExecutionException | IOException e) {
>              Throwables.propagate(e);
>          } finally {
>              producer.close();
>          }
>
> When we try this with a large file with a million records, only half of them around 500,000 get written to the topic. In the above example, I verified this by running the GetOffset tool after fair amount of time (to ensure all records had finished processing) as follows:
>
>
>          ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> --time -1 --topic <topic_name>
>
>
>
>
> The output of this was :
>
>
>          topic_name:1:292954
>
>          topic_name:0:296787
>
>
> What could be causing this dropping of records?
>
> Thanks,
> Varun
>