You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/10/11 05:36:52 UTC

Re: UnknownProducerIdException every few minutes

Maybe related to https://issues.apache.org/jira/browse/KAFKA-7190

It's fixed in upcoming 2.4 release.


-Matthias

On 9/25/19 3:08 PM, Alessandro Tagliapietra wrote:
> Hello everyone,
> 
> I've another problem unrelated to the previous one so I'm creating another
> thread
> We've a stream application that reads from a topic, read/writes from 3
> different stores and writes the output to another topic. All with exactly
> once processing guarantee enabled.
> 
> Due to a bug in the producer logic that was sending messages to the topic
> our stream is reading from, it was sending over and over the same data with
> increasing timestamps.
> For example it was sending data with an initial timestamp of today at 12:00
> until 15:00 and then 12:00 again over and over creating a cycle in the
> timestamps.
> 
> In our stream configuration we're using a timestamp extractor that reads
> the message from the timestamp json.
> 
> Since this bugged producer was sending data in batches every few minutes
> our stream was dying every few minutes with this error:
> 
> [kafka-producer-network-thread |
> my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1-0_0-producer]
> ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
> task [0_0] Error sending record to topic
> my-app-orders-pipeline-order-unique-emitter-changelog due to This exception
> is raised by the broker if it could not locate the producer metadata
> associated with the producerId in question. This could happen if, for
> instance, the producer's records were deleted because their retention time
> had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.; No more records will be sent and no
> more offsets will be recorded for this task. Enable TRACE logging to view
> failed record key and value.
> org.apache.kafka.common.errors.UnknownProducerIdException: This exception
> is raised by the broker if it could not locate the producer metadata
> associated with the producerId in question. This could happen if, for
> instance, the producer's records were deleted because their retention time
> had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.
> [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1]
> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
> stream-thread
> [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1]
> Failed to commit stream task 0_0 due to the following error:
> org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending
> since an error caught with a previous record (key 59 value [B@16e88816
> timestamp 1568779124999) to topic
> my-app-orders-pipeline-order-unique-emitter-changelog due to
> org.apache.kafka.common.errors.UnknownProducerIdException: This exception
> is raised by the broker if it could not locate the producer metadata
> associated with the producerId in question. This could happen if, for
> instance, the producer's records were deleted because their retention time
> had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:138)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:201)
> at
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1318)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:720)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:706)
> at
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:663)
> at
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:585)
> at
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:73)
> at
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:789)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
> at java.lang.Thread.run(Thread.java:748)
> 
> Sometimes the error referred the first store changelog in our stream,
> sometimes another one.
> I've found https://issues.apache.org/jira/browse/KAFKA-6817 this issue but
> it doesn't seems to be related because:
>  - the timestamps used are from today so way within the 4 weeks retention
> set for the changelog topic (which also uses compact as policy not delete)
>  - the stream correctly processed data from that key over and over so I
> don't think the producer ID has expired multiple times in minutes
> 
> Any help is very appreciated
> 
> --
> Alessandro Tagliapietra
>