You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Neeraj Vaidya <ne...@yahoo.co.in.INVALID> on 2022/04/23 11:55:52 UTC

org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch

Hi All,
My setup is shown in the attached JPEG file.
In my setup, I have a stretch cluster spread across 2 data-centres (Geographically distant). The network latency as measured by ping round-trip-time is about 50ms.
There are 4 brokers in each DC.
In each of these data-centres, I have a Kafka Producer application and a KStream application.
My test involves the following :
Make producer in DC1 produce records at the rate of say about 3000 messages per second.
Shutdown all brokers in DC2, so as to simulate a site-outage, for about 30 minutes.

When I re-start the brokers in DC2, I encounter the following errors in the logs of my KStream application. (Note : The Kafka producer application does not seem to suffer from any such errors).
Obviously, I can see that the replica lag in the brokers of DC2 has increased, but is gradually reducing due to the brokers in DC2 now trying to fetch records from DC1 brokers.
However, the KStream application shuts down and cannot be started up till the replica lag for all the partitions of the topic from which the KStream application consumes, is completed.
The errors I see are as follows in the KStream application and soon after, it shuts down.

org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
        at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:707)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:693)
        at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:640)
        at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:574)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:561)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:561)
        at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$3(Sender.java:785)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
2022-04-21T16:27:08.469 [mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down
2022-04-21T16:27:08.470 [mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  o.a.k.s.p.internals.StreamTask - MSG=stream-thread [mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] task [0_143] Suspended running
2022-04-21T16:27:08.475 [mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer clientId=mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions