You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by R Krishna <kr...@gmail.com> on 2016/06/14 03:32:24 UTC

async producer retry behavior - at least once guarantee

As part of testing v0.9 Kafka at least once guarantees, we tried
disconnecting Producer network and found that retries=10000000 are not
happening. We get a

WARN  kafka-producer-network-thread | producer-1
[.kafka.clients.producer.internals.Sender]  - Got error produce response
with correlation id 6474 on topic-partition test-topic-3-100-38, retrying
(9999999 attempts left). Error: NETWORK_EXCEPTION

And

org.apache.kafka.common.errors.TimeoutException: Batch Expired

When we tried debugging by putting a breakpoint in Accumulator and
BatchRecord classes to stop when batch.attempts > 1 and it never stops
beyond a value of 1 where the batch is reenqueued and although canRetry()
always returns true. Is there a better way to debug this?
clients.producer.internals.Sender.completeBatch(RecordBatch, Errors, long,
long, long)

The producer decides to skip messages when there is a network issue and was
also verified by checking topic message counts.

Also, the only option in an Async send is a callback on completion where
even the recordmetadata is empty as expected because there was no server
communication but how do we get the record itself after all the retries
have happened so that nothing is lost?
reconnect.backoff.ms = 100

retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 30000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
request.timeout.ms = 5000
acks = 1
batch.size = 16384
receive.buffer.bytes = 32768
retries = 10000000 <<<<<<<<<<<<<<<<
max.request.size = 1048576
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
linger.ms = 10
    /*
     * Produce a record without waiting for server. This includes a callback
     * that will print an error if something goes wrong
     */
    public static void produceAsync(Producer<String, String> producer,
String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<String,
String>(topic, value);
        producer.send(record, new DemoProducerCallback());
    }

    public static class DemoProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception
e) {
            if (e != null) {
                System.out.println("Error producing to topic " +
                            ((recordMetadata != null) ?
recordMetadata.topic() : ""));
                e.printStackTrace();
            }
        }
    }

Re: async producer retry behavior - at least once guarantee

Posted by R Krishna <kr...@gmail.com>.
Increasing reconnect.backoff.ms=1000 ms and BLOCK_ON_BUFFER_FULL_CONFIG to
true did not help either. The messages are simply lost.

Upset to find that there is no way to handle messages that are lost when
broker itself is not available and retries are not part of broker
connection issues.
https://issues.apache.org/jira/browse/KAFKA-156

The slide 24 of
http://www.slideshare.net/jhols1/apache-kafka-reliability-guarantees-stratahadoop-nyc-2015
also shows that the retries happen only if drain response fails but does
this include no or null response?

Let me try modifying some of these classes.




On Mon, Jun 13, 2016 at 8:32 PM, R Krishna <kr...@gmail.com> wrote:

> As part of testing v0.9 Kafka at least once guarantees, we tried
> disconnecting Producer network and found that retries=10000000 are not
> happening. We get a
>
> WARN  kafka-producer-network-thread | producer-1
> [.kafka.clients.producer.internals.Sender]  - Got error produce response
> with correlation id 6474 on topic-partition test-topic-3-100-38, retrying
> (9999999 attempts left). Error: NETWORK_EXCEPTION
>
> And
>
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>
> When we tried debugging by putting a breakpoint in Accumulator and
> BatchRecord classes to stop when batch.attempts > 1 and it never stops
> beyond a value of 1 where the batch is reenqueued and although canRetry()
> always returns true. Is there a better way to debug this?
> clients.producer.internals.Sender.completeBatch(RecordBatch, Errors, long,
> long, long)
>
> The producer decides to skip messages when there is a network issue and
> was also verified by checking topic message counts.
>
> Also, the only option in an Async send is a callback on completion where
> even the recordmetadata is empty as expected because there was no server
> communication but how do we get the record itself after all the retries
> have happened so that nothing is lost?
> reconnect.backoff.ms = 100
>
> retry.backoff.ms = 100
> buffer.memory = 33554432
> timeout.ms = 30000
> connections.max.idle.ms = 540000
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> request.timeout.ms = 5000
> acks = 1
> batch.size = 16384
> receive.buffer.bytes = 32768
> retries = 10000000 <<<<<<<<<<<<<<<<
> max.request.size = 1048576
> metrics.sample.window.ms = 30000
> send.buffer.bytes = 131072
> linger.ms = 10
>     /*
>      * Produce a record without waiting for server. This includes a
> callback
>      * that will print an error if something goes wrong
>      */
>     public static void produceAsync(Producer<String, String> producer,
> String topic, String key, String value) {
>         ProducerRecord<String, String> record = new ProducerRecord<String,
> String>(topic, value);
>         producer.send(record, new DemoProducerCallback());
>     }
>
>     public static class DemoProducerCallback implements Callback {
>         @Override
>         public void onCompletion(RecordMetadata recordMetadata, Exception
> e) {
>             if (e != null) {
>                 System.out.println("Error producing to topic " +
>                             ((recordMetadata != null) ?
> recordMetadata.topic() : ""));
>                 e.printStackTrace();
>             }
>         }
>     }
>



-- 
Radha Krishna, Proddaturi
253-234-5657