You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "huxi (JIRA)" <ji...@apache.org> on 2016/12/09 08:34:59 UTC

[jira] [Commented] (KAFKA-4515) Async producer send not retrying on TimeoutException: Batch Expired

    [ https://issues.apache.org/jira/browse/KAFKA-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15734702#comment-15734702 ] 

huxi commented on KAFKA-4515:
-----------------------------

Did you see lots of "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error" items in the producer log when enabling retry?

> Async producer send not retrying on TimeoutException: Batch Expired
> -------------------------------------------------------------------
>
>                 Key: KAFKA-4515
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4515
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.9.0.1
>            Reporter: Di Shang
>
> We are testing out broker failure resiliency, we have a cluster of 3 brokers, a topic with 5 partitions and 2 replicas. The replicas are evenly distributed and there is at least a partition leader in every broker. We use this code to continuously send msg and then kill one of the brokers to see if we lost any msg. 
> {code:title=MyTest.java|borderStyle=solid}
>     static volatile KafkaProducer<Void, String> producer;
>     public static void send(ProducerRecord<Void, String> record) {
>         producer.send(record, (metadata, exception) -> {
>             if (exception != null) {
>                 // handle exception with manual retry
>                 System.out.println("Error, resending...");
>                 exception.printStackTrace();
>                 try {
>                     Thread.sleep(100);
>                 } catch (InterruptedException e) {
>                     e.printStackTrace();
>                 }
>                 //send(record); // without this retry, msg would be lost
>             } else if (metadata != null) {
>                 System.out.println("Sent " + record);
>             } else {
>                 System.out.println("No exception and no metadata");
>             }
>         });
>     }
>     public static void main(String[] args) throws Exception {
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "...");
>         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>         props.put("retries", "100000");
>         props.put("acks", "1");
>         props.put("request.timeout.ms", "1000");
>         producer = new KafkaProducer<>(props);
>         Long i = 1L;
>         while (true) {
>             ProducerRecord<Void, String> record =
>                 new ProducerRecord<>("my-topic", i.toString());
>             send(record);
>             Thread.sleep(100);
>             i++;
>         }
>     }
> {code}
> What we found is that when we set *request.timeout.ms* to a small value like 1000, then when we kill a broker we would get a few TimeoutException: Batch Expired errors in the send() callback. And if we don't handle this by explicit retry like in the above code, then we would lose those msg. 
> The documentation for *request.timeout.ms* says:
> bq. The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
> This makes me think that a TimeoutException should be implicitly retried using the *retries* options, which doesn't seem to work. 
> Strangely we also noticed that if *request.timeout.ms* is set long enough like the default 30000, then we don't lose any msg when killing a broker even if we set *retries* to 0. 
> So it seems to me that the *retries* option is not working regarding to broker down scenario. There seems to be some other internal mechanism for handling broker failure and msg retry, and this mechanism won't work if there is TimeoutException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)