You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Neeraj Vaidya <ne...@yahoo.co.in.INVALID> on 2021/09/13 06:40:31 UTC

InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

Hi All,

My software versions :
Apache Kafka 2.7.0
Kafka Streams 2.6.0

I have a KafkaStreams application which consumes from a topic which has 12 partitions. The incoming message rate into this topic is very low, perhaps 3-4 per minute. Also, some partitions will not receive messages for more than 7 days.

Exactly after 7 days of starting this application, I seem to be getting the following exception and the application shuts down, without processing anymore messages :

2021-09-10T12:21:59.636 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
        at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN

After this, I can see that all 12 tasks (because there are 12 partitions for all topics) get shutdown and this brings down the whole application.

I understand that the transactional.id.expiration.ms = 7 days (default) will likely cause the application thread from getting expired, but why does this specific thread/task not get fenced or respawned.
Why shutdown the entire Streams processing application just because one task has been idle ??

Is there a way to keep my application up and running without causing it to shutdown ?


Regards,
Neeraj

Re: InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

Posted by Neeraj Vaidya <ne...@yahoo.co.in.INVALID>.
 Following on this email. I raised this as a defect on the Apache Kafka JIRA (KAFKA-13292)

Based on a suggestion from Mathias J Sax I upgraded my client libraries to 2.8.0 and also wrote a custom StreamsExceptionHandler to respond with an action of REPLACE_THREAD, when such an InvalidPidMappingException is encountered. This is all good, as in the event/message being processed currently is first aborted as EOS is enabled in my Streams application.
Also, a new thread is started and the message is then successfully processed by that new thread.

My only outstanding question is : Does this exception only apply to Streams applications which have state stores ? 
I have a Producer application which produces messages to the same topic, but does not suffer from this InvalidPidMappingException.

Regards,
Neeraj     On Monday, 13 September, 2021, 04:40:50 pm GMT+10, Neeraj Vaidya <ne...@yahoo.co.in.invalid> wrote:  
 
 Hi All,

My software versions :
Apache Kafka 2.7.0
Kafka Streams 2.6.0

I have a KafkaStreams application which consumes from a topic which has 12 partitions. The incoming message rate into this topic is very low, perhaps 3-4 per minute. Also, some partitions will not receive messages for more than 7 days.

Exactly after 7 days of starting this application, I seem to be getting the following exception and the application shuts down, without processing anymore messages :

2021-09-10T12:21:59.636 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
        at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN

After this, I can see that all 12 tasks (because there are 12 partitions for all topics) get shutdown and this brings down the whole application.

I understand that the transactional.id.expiration.ms = 7 days (default) will likely cause the application thread from getting expired, but why does this specific thread/task not get fenced or respawned.
Why shutdown the entire Streams processing application just because one task has been idle ??

Is there a way to keep my application up and running without causing it to shutdown ?


Regards,
Neeraj