You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alisson Sales <al...@gmail.com> on 2019/08/22 23:12:20 UTC

Kafka Streams and broker compatibility

Hi all, I've just upgraded a project that was using kafka-streams 2.2.0 to
2.2.1 and found the following error at the end.

I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11
the error doesn't happen anymore.

My question here is: where is the best place we can find the required
minimum broker version for the kafka-streams version one is using?

This is not clear to me and the
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix wiki
page seems outdated.

Thanks in advance

Exception in thread
"streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed
to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
at
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
Abort sending since an error caught with a previous record (key
A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db timestamp
null) to topic
streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog due to
java.lang.IllegalArgumentException: Magic v1 does not support record headers
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
at
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
... 10 more
Caused by: java.lang.IllegalArgumentException: Magic v1 does not support
record headers
at
org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
at
org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
at
org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
at
org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)

Re: Kafka Streams and broker compatibility

Posted by Alisson Sales <al...@gmail.com>.
Awesome, thanks for clarifying :)

On Tue, Aug 27, 2019 at 1:08 PM Guozhang Wang <wa...@gmail.com> wrote:

> Right, the fix itself actually add more headers even if there were none
> from the source topics, and hence cause old versioned brokers to fail. But
> theoretically speaking, as long as the streams clients are version 0.11.0+
> the broker version should be 0.11.0+ for various features that may require
> higher message format (eos, suppression, etc).
>
> On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
>
> > I'm pretty sure one of the Suppress bug fixes that went into 2.2.1
> involved
> > adding headers. Updating the compatibility matrix must have just slipped
> > when that bugfix was merged -- thanks for bringing this up!
> >
> > On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales <al...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang, thanks for your reply.
> > >
> > > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> > > upgrading to this version mostly because we were facing problems with
> > > KTable suppress.
> > >
> > > I was experiencing this exact same problem:
> > >
> > >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> > > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
> > >
> > > When trying to confirm the fix worked for my topology/app I encountered
> > the
> > > issue: java.lang.IllegalArgumentException: Magic v1 does not support
> > > record.
> > >
> > > In summary the topology works fine on 0.10.2.1 with kafka-streams
> 2.2.0,
> > > but fails with the error above if I use 2.2.1.
> > >
> > > I haven't changed any part of the code, simply updated my gradle file
> > > updating the dependency.
> > >
> > > Thanks again
> > >
> > > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Alisson,
> > > >
> > > > The root cause you've seen is the message header support, which is
> > added
> > > in
> > > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0
> > (KIP-244).
> > > If
> > > > your code does not add any more headers then it would only inherit
> the
> > > > headers from source topics when trying to write to intermediate /
> sink
> > > > topics. So I think that even if you were using 2.2.0 you'd still hit
> > this
> > > > issue if you happen to have headers in some of your source topic
> > > messages.
> > > >
> > > > I've updated
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > per
> > > > the updates.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <
> alisson.sales@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi all, I've just upgraded a project that was using kafka-streams
> > 2.2.0
> > > > to
> > > > > 2.2.1 and found the following error at the end.
> > > > >
> > > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> > > 0.11
> > > > > the error doesn't happen anymore.
> > > > >
> > > > > My question here is: where is the best place we can find the
> required
> > > > > minimum broker version for the kafka-streams version one is using?
> > > > >
> > > > > This is not clear to me and the
> > > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > > > wiki
> > > > > page seems outdated.
> > > > >
> > > > > Thanks in advance
> > > > >
> > > > > Exception in thread
> > > > >
> > >
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > > > Failed
> > > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > > > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> > [1_2]
> > > > > Abort sending since an error caught with a previous record (key
> > > > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > > > > timestamp
> > > > > null) to topic
> > > > >
> streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog
> > > due
> > > > to
> > > > > java.lang.IllegalArgumentException: Magic v1 does not support
> record
> > > > > headers
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > > > > ... 10 more
> > > > > Caused by: java.lang.IllegalArgumentException: Magic v1 does not
> > > support
> > > > > record headers
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Streams and broker compatibility

Posted by Guozhang Wang <wa...@gmail.com>.
Right, the fix itself actually add more headers even if there were none
from the source topics, and hence cause old versioned brokers to fail. But
theoretically speaking, as long as the streams clients are version 0.11.0+
the broker version should be 0.11.0+ for various features that may require
higher message format (eos, suppression, etc).

On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved
> adding headers. Updating the compatibility matrix must have just slipped
> when that bugfix was merged -- thanks for bringing this up!
>
> On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales <al...@gmail.com>
> wrote:
>
> > Hi Guozhang, thanks for your reply.
> >
> > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> > upgrading to this version mostly because we were facing problems with
> > KTable suppress.
> >
> > I was experiencing this exact same problem:
> >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
> >
> > When trying to confirm the fix worked for my topology/app I encountered
> the
> > issue: java.lang.IllegalArgumentException: Magic v1 does not support
> > record.
> >
> > In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
> > but fails with the error above if I use 2.2.1.
> >
> > I haven't changed any part of the code, simply updated my gradle file
> > updating the dependency.
> >
> > Thanks again
> >
> > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Alisson,
> > >
> > > The root cause you've seen is the message header support, which is
> added
> > in
> > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0
> (KIP-244).
> > If
> > > your code does not add any more headers then it would only inherit the
> > > headers from source topics when trying to write to intermediate / sink
> > > topics. So I think that even if you were using 2.2.0 you'd still hit
> this
> > > issue if you happen to have headers in some of your source topic
> > messages.
> > >
> > > I've updated
> > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > per
> > > the updates.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <alisson.sales@gmail.com
> >
> > > wrote:
> > >
> > > > Hi all, I've just upgraded a project that was using kafka-streams
> 2.2.0
> > > to
> > > > 2.2.1 and found the following error at the end.
> > > >
> > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> > 0.11
> > > > the error doesn't happen anymore.
> > > >
> > > > My question here is: where is the best place we can find the required
> > > > minimum broker version for the kafka-streams version one is using?
> > > >
> > > > This is not clear to me and the
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > > wiki
> > > > page seems outdated.
> > > >
> > > > Thanks in advance
> > > >
> > > > Exception in thread
> > > >
> > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > > Failed
> > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [1_2]
> > > > Abort sending since an error caught with a previous record (key
> > > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > > > timestamp
> > > > null) to topic
> > > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog
> > due
> > > to
> > > > java.lang.IllegalArgumentException: Magic v1 does not support record
> > > > headers
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > > > ... 10 more
> > > > Caused by: java.lang.IllegalArgumentException: Magic v1 does not
> > support
> > > > record headers
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang

Re: Kafka Streams and broker compatibility

Posted by Sophie Blee-Goldman <so...@confluent.io>.
I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved
adding headers. Updating the compatibility matrix must have just slipped
when that bugfix was merged -- thanks for bringing this up!

On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales <al...@gmail.com>
wrote:

> Hi Guozhang, thanks for your reply.
>
> I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> upgrading to this version mostly because we were facing problems with
> KTable suppress.
>
> I was experiencing this exact same problem:
>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
>
> When trying to confirm the fix worked for my topology/app I encountered the
> issue: java.lang.IllegalArgumentException: Magic v1 does not support
> record.
>
> In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
> but fails with the error above if I use 2.2.1.
>
> I haven't changed any part of the code, simply updated my gradle file
> updating the dependency.
>
> Thanks again
>
> On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Alisson,
> >
> > The root cause you've seen is the message header support, which is added
> in
> > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244).
> If
> > your code does not add any more headers then it would only inherit the
> > headers from source topics when trying to write to intermediate / sink
> > topics. So I think that even if you were using 2.2.0 you'd still hit this
> > issue if you happen to have headers in some of your source topic
> messages.
> >
> > I've updated
> > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> per
> > the updates.
> >
> >
> > Guozhang
> >
> > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <al...@gmail.com>
> > wrote:
> >
> > > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0
> > to
> > > 2.2.1 and found the following error at the end.
> > >
> > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> 0.11
> > > the error doesn't happen anymore.
> > >
> > > My question here is: where is the best place we can find the required
> > > minimum broker version for the kafka-streams version one is using?
> > >
> > > This is not clear to me and the
> > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > wiki
> > > page seems outdated.
> > >
> > > Thanks in advance
> > >
> > > Exception in thread
> > >
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > Failed
> > > to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> > > Abort sending since an error caught with a previous record (key
> > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > > timestamp
> > > null) to topic
> > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog
> due
> > to
> > > java.lang.IllegalArgumentException: Magic v1 does not support record
> > > headers
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > > ... 10 more
> > > Caused by: java.lang.IllegalArgumentException: Magic v1 does not
> support
> > > record headers
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Kafka Streams and broker compatibility

Posted by Alisson Sales <al...@gmail.com>.
Hi Guozhang, thanks for your reply.

I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
upgrading to this version mostly because we were facing problems with
KTable suppress.

I was experiencing this exact same problem:
https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.

When trying to confirm the fix worked for my topology/app I encountered the
issue: java.lang.IllegalArgumentException: Magic v1 does not support record.

In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
but fails with the error above if I use 2.2.1.

I haven't changed any part of the code, simply updated my gradle file
updating the dependency.

Thanks again

On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Alisson,
>
> The root cause you've seen is the message header support, which is added in
> brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). If
> your code does not add any more headers then it would only inherit the
> headers from source topics when trying to write to intermediate / sink
> topics. So I think that even if you were using 2.2.0 you'd still hit this
> issue if you happen to have headers in some of your source topic messages.
>
> I've updated
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix per
> the updates.
>
>
> Guozhang
>
> On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <al...@gmail.com>
> wrote:
>
> > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0
> to
> > 2.2.1 and found the following error at the end.
> >
> > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11
> > the error doesn't happen anymore.
> >
> > My question here is: where is the best place we can find the required
> > minimum broker version for the kafka-streams version one is using?
> >
> > This is not clear to me and the
> > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > wiki
> > page seems outdated.
> >
> > Thanks in advance
> >
> > Exception in thread
> > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> Failed
> > to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> > Abort sending since an error caught with a previous record (key
> > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > timestamp
> > null) to topic
> > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog due
> to
> > java.lang.IllegalArgumentException: Magic v1 does not support record
> > headers
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > at
> >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > at
> >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > ... 10 more
> > Caused by: java.lang.IllegalArgumentException: Magic v1 does not support
> > record headers
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> > at
> >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> > at
> >
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> > at
> >
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> > at
> >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> > at
> >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
> >
>
>
> --
> -- Guozhang
>

Re: Kafka Streams and broker compatibility

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Alisson,

The root cause you've seen is the message header support, which is added in
brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). If
your code does not add any more headers then it would only inherit the
headers from source topics when trying to write to intermediate / sink
topics. So I think that even if you were using 2.2.0 you'd still hit this
issue if you happen to have headers in some of your source topic messages.

I've updated
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix per
the updates.


Guozhang

On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <al...@gmail.com>
wrote:

> Hi all, I've just upgraded a project that was using kafka-streams 2.2.0 to
> 2.2.1 and found the following error at the end.
>
> I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11
> the error doesn't happen anymore.
>
> My question here is: where is the best place we can find the required
> minimum broker version for the kafka-streams version one is using?
>
> This is not clear to me and the
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> wiki
> page seems outdated.
>
> Thanks in advance
>
> Exception in thread
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed
> to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> at
>
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> Abort sending since an error caught with a previous record (key
> A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> timestamp
> null) to topic
> streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog due to
> java.lang.IllegalArgumentException: Magic v1 does not support record
> headers
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> ... 10 more
> Caused by: java.lang.IllegalArgumentException: Magic v1 does not support
> record headers
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> at
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> at
>
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
>


-- 
-- Guozhang