You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sreeram <sr...@gmail.com> on 2017/06/24 14:52:39 UTC

Help needed with Batch Expired exception

Hi,

  I am trying to send messages (synchronous) to a kafka cluster (lets
call it A). I get 'Batch Expired Exception' very frequently.
Also the average time taken per send is very high around 5 seconds.

However for the same code when I send messages to a different kafka
cluster B (with same network latency), average time per send is 15ms
which is acceptable.

I have checked the kafka server logs in cluster A and nothing looks abnormal.

ProducerPerformance tool is returning below good results for cluster A
which makes it confusing.

kafka-run-class org.apache.kafka.tools.ProducerPerformance --topic
testREPL  --num-records 50000 --record-size 1000 --throughput 100
--producer-props acks=1 bootstrap.servers=<broker list>
batch.size=8196

output: 50000 records sent, 100.000400 records/sec (0.10 MB/sec), 0.52
ms avg latency, 179.00 ms max latency, 1 ms 50th, 1 ms 95th, 1 ms
99th, 3 ms 99.9th.

My producer properties look like this

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

producer send looks like this
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>(topic, partition,
Integer.toString(partition), payload);
producer.send(producerRecord).get();
I have spent very long time trying to debug but no luck - I appreciate
any help to help fix this.

Below is sample stack trace when I am publishing to cluster A

java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Batch Expired
org.apache.kafka.common.errors.TimeoutException: Batch Expired
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
10.0.128.115 org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
10.0.128.115 org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)

Thanks,
Sreeram