You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2018/06/05 22:35:41 UTC

Re: UnknownProducerIdException in Kafka streams when enabling exactly once

Sorry for late reply.

> The source stream contains millions of messages produced over several
months.

What is the retention time of the output topic? If it is smaller than
the message timestamp (that I expect to be multiple month old), on write
the data would be delete quitckly, because it's older than retention
time (note, that Kafka Streams preserve the timestamp of the input
records and sets them as timestamps for the output records).

At the same time, the producerId is stored in the output topic and if
the topic segments are deleted, the producerId gets lost and thus, a
alter write fails.

Thus, you would need to either set the retention time of the output
topic larger, or maybe configure AppendTime instead of default
CreateTime. Of course, configuring AppendTime will alter the semantics
of the output data as they get new timestamps assigned.



-Matthias

On 4/17/18 10:52 AM, Odin wrote:
> After enabling exactly once processing on a Kafka streams application, the following error appears in the logs:
> 
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer
> due to the following error:
> 
> org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort
> sending since an error caught with a previous record (key 222222 value
> some-value timestamp 1519200902670) to topic exactly-once-test-topic-
> v2 due to This exception is raised by the broker if it could not
> locate the producer metadata associated with the producerId in
> question. This could happen if, for instance, the producer's records
> were deleted because their retention time had elapsed. Once the last
> records of the producerId are removed, the producer's metadata is
> removed from the broker, and future appends by the producer will
> return this exception.
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>   at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>   at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>   at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>   at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
>   at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
>   at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>   at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>   at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
>   at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>   at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
>   at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
>   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.UnknownProducerIdException
> 
> We've reproduced the issue with a minimal test case where we move messages from a source stream to another stream without any transformation. The source stream contains millions of messages produced over several months. The KafkaStreams object is created with the following StreamsConfig:
> 
> - StreamsConfig.PROCESSING_GUARANTEE_CONFIG = "exactly_once"
> - StreamsConfig.APPLICATION_ID_CONFIG = "Some app id"
> - StreamsConfig.NUM_STREAM_THREADS_CONFIG = 1
> - ProducerConfig.BATCH_SIZE_CONFIG = 102400
> 
> The app is able to process some messages before the exception occurs. After restarting the app, the processing of messages continues for a while before the exception occurs again.
> 
> Context information:
> 
> - we're running a 5 node Kafka 1.1.0 cluster with 5 zookeeper nodes.
> - there are multiple instances of the app running
> 
> 1. Has anyone seen this problem before or can give us any hints about what might be causing this behaviour?
> 
> 2. How should problems like this be treated?
> 
> Sincerely
> Odin
>