You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Di Shang (JIRA)" <ji...@apache.org> on 2016/12/09 06:42:59 UTC

[jira] [Created] (KAFKA-4515) Async producer send not retrying on TimeoutException: Batch Expired

Di Shang created KAFKA-4515:
-------------------------------

             Summary: Async producer send not retrying on TimeoutException: Batch Expired
                 Key: KAFKA-4515
                 URL: https://issues.apache.org/jira/browse/KAFKA-4515
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 0.9.0.1
            Reporter: Di Shang


We are testing out broker failure resiliency, we have a cluster of 3 brokers, a topic with 5 partitions and 2 replicas. We use this code to continuously send msg and then kill one of the brokers to see if we lost any msg. 

{code:title=MyTest.java|borderStyle=solid}
    static volatile KafkaProducer<Void, String> producer;

    public static void send(ProducerRecord<Void, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // handle exception with manual retry
                System.out.println("Error, resending...");
                exception.printStackTrace();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //send(record); // without this retry, msg would be lost
            } else if (metadata != null) {
                System.out.println("Sent " + record);
            } else {
                System.out.println("No exception and no metadata");
            }
        });
    }

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "...");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("retries", "100000");
        props.put("acks", "1");
        props.put("request.timeout.ms", "1000");
        producer = new KafkaProducer<>(props);

        Long i = 1L;
        while (true) {
            ProducerRecord<Void, String> record =
                new ProducerRecord<>("my-topic", i.toString());

            send(record);

            Thread.sleep(100);
            i++;
        }
    }
{code}

What we found is that when we set *request.timeout.ms* to a small value like 1000, then when we kill a broker we would get a few TimeoutException: Batch Expired errors in the send() callback. And if we don't handle this by explicit retry like in the above code, then we would lose those msg. 

The documentation for *request.timeout.ms* says:
bq. The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

This makes me think that a TimeoutException should be implicitly retried using the *retries* options, which doesn't seem to work. 

Strangely we also noticed that if *request.timeout.ms* is set long enough like the default 30000, then we don't lose any msg when killing a broker even if we set *retries* to 0. 

So it seems to me that the *retries* option is not working regarding to broker down scenario. There seems to be some other internal mechanism for handling broker failure and msg retry, and this mechanism won't work if there is TimeoutException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)