You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tomasz Gac <to...@empirica.io.INVALID> on 2022/11/15 10:14:20 UTC

Cannot send in state COMMITTING_TRANSACTION

Hi group,

We've encountered a problem during regular operation of the kafka-streams
application. While processing a record we received the following error.
There's very little documentation on this kind of problem but I've gathered
that it's a synchronization issue between kafka consumer and producer. Have
you ever encountered it before?

My questions are: is this a matter of misconfiguration, or rather a bug?
Has it been fixed?

We're using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
against the kafka broker version 2.1.1. We are running it as an OSGI bundle
with dependencies packaged within the bundle.

Thank you.

2022-10-18T01:20:19,665 | ERROR |
TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1 |
TaskManager | 218 - org.apache.servicemix.bundles.kafka-clients - 2.8.1.1 |
stream-thread
[TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1] Failed
to process stream task 0_2 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_2, processor=KSTREAM-SOURCE-0000000002,
topic=ORDER_BOOK_BEST, partition=2, offset=13452067,
stacktrace=java.lang.IllegalStateException: Cannot call send in state
COMMITTING_TRANSACTION at
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
at
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
~[!/:?] at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
[!/:?] at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
[!/:?] at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
[!/:?] at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
[!/:?] Caused by: java.lang.IllegalStateException: Cannot call send in
state COMMITTING_TRANSACTION at
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
~[!/:?] at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
~[!/:?] at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
~[!/:?] at
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
~[!/:?] at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
~[!/:?] at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
~[!/:?] at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
~[!/:?] at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
~[!/:?] ... 4 more

Re: Cannot send in state COMMITTING_TRANSACTION

Posted by Tomasz Gac <to...@empirica.io.INVALID>.
Hi Sophie,

Thank you for your response.

Unfortunately, I am unable to file the report because it requires
membership I don't have.
However, I obtained the permission to share logs with you.

We had 5 different kafka streams applications running in parallel on two
servers: indicators1 and indicators2.
I'm attaching logs from both servers. The error occurred on indicators1 and
had not occurred since.


śr., 16 lis 2022 o 04:31 Sophie Blee-Goldman <so...@confluent.io.invalid>
napisał(a):

> Interesting, this does look like it could be a bug in Streams and I'm not
> aware of
> any known or already-fixed issues resembling this. Could you file a bug
> report
> over here <
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA>
> and
> include as much context/information as possible? Providing logs
> from around the time leading up to this exception in particular would
> greatly
> help in debugging this
>
> On Tue, Nov 15, 2022 at 2:15 AM Tomasz Gac <tomasz.gac@empirica.io
> .invalid>
> wrote:
>
> > Hi group,
> >
> > We've encountered a problem during regular operation of the kafka-streams
> > application. While processing a record we received the following error.
> > There's very little documentation on this kind of problem but I've
> gathered
> > that it's a synchronization issue between kafka consumer and producer.
> Have
> > you ever encountered it before?
> >
> > My questions are: is this a matter of misconfiguration, or rather a bug?
> > Has it been fixed?
> >
> > We're using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
> > against the kafka broker version 2.1.1. We are running it as an OSGI
> bundle
> > with dependencies packaged within the bundle.
> >
> > Thank you.
> >
> > 2022-10-18T01:20:19,665 | ERROR |
> > TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1 |
> > TaskManager | 218 - org.apache.servicemix.bundles.kafka-clients -
> 2.8.1.1 |
> > stream-thread
> > [TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1]
> Failed
> > to process stream task 0_2 due to the following error:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_2, processor=KSTREAM-SOURCE-0000000002,
> > topic=ORDER_BOOK_BEST, partition=2, offset=13452067,
> > stacktrace=java.lang.IllegalStateException: Cannot call send in state
> > COMMITTING_TRANSACTION at
> >
> >
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
> > at
> >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
> > at
> >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > at
> >
> >
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
> > [!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> > [!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> > [!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> > [!/:?] Caused by: java.lang.IllegalStateException: Cannot call send in
> > state COMMITTING_TRANSACTION at
> >
> >
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> > ~[!/:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
> > ~[!/:?] ... 4 more
> >
>

Re: Cannot send in state COMMITTING_TRANSACTION

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Interesting, this does look like it could be a bug in Streams and I'm not
aware of
any known or already-fixed issues resembling this. Could you file a bug
report
over here <https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA>
and
include as much context/information as possible? Providing logs
from around the time leading up to this exception in particular would
greatly
help in debugging this

On Tue, Nov 15, 2022 at 2:15 AM Tomasz Gac <to...@empirica.io.invalid>
wrote:

> Hi group,
>
> We've encountered a problem during regular operation of the kafka-streams
> application. While processing a record we received the following error.
> There's very little documentation on this kind of problem but I've gathered
> that it's a synchronization issue between kafka consumer and producer. Have
> you ever encountered it before?
>
> My questions are: is this a matter of misconfiguration, or rather a bug?
> Has it been fixed?
>
> We're using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
> against the kafka broker version 2.1.1. We are running it as an OSGI bundle
> with dependencies packaged within the bundle.
>
> Thank you.
>
> 2022-10-18T01:20:19,665 | ERROR |
> TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1 |
> TaskManager | 218 - org.apache.servicemix.bundles.kafka-clients - 2.8.1.1 |
> stream-thread
> [TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1] Failed
> to process stream task 0_2 due to the following error:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_2, processor=KSTREAM-SOURCE-0000000002,
> topic=ORDER_BOOK_BEST, partition=2, offset=13452067,
> stacktrace=java.lang.IllegalStateException: Cannot call send in state
> COMMITTING_TRANSACTION at
>
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
> at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
> [!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> [!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> [!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> [!/:?] Caused by: java.lang.IllegalStateException: Cannot call send in
> state COMMITTING_TRANSACTION at
>
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
> ~[!/:?] at
>
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
> ~[!/:?] at
>
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
> ~[!/:?] ... 4 more
>