You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lucas Brutschy (Jira)" <ji...@apache.org> on 2024/01/03 18:07:00 UTC

[jira] [Created] (KAFKA-16077) State updater fails to close task when input partitions are updated

Lucas Brutschy created KAFKA-16077:
--------------------------------------

             Summary: State updater fails to close task when input partitions are updated
                 Key: KAFKA-16077
                 URL: https://issues.apache.org/jira/browse/KAFKA-16077
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.7.0
            Reporter: Lucas Brutschy


There is a race condition in the state updater that can cause the following:
 # We have an active task in the state updater
 # We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
 # We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
 # The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
 # Now the task is in an initialized state, but the underlying producer does not have transactions initialized

This can lead to an exception like this:
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000005, topic=node-name-repartition, partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:847)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:847)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1919)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:953)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
This affects EOSv2 only.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)