You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shalom Sagges <sh...@gmail.com> on 2019/11/21 08:38:42 UTC

Magic v1 Does not Support Record Headers

Hi Experts,

I use Kafka 0.11.2

I have an issue where the Kafka logs are bombarded with the following error:
ERROR [KafkaApi-14733] Error when handling request
{replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]}
(kafka.server.KafkaApis)
java.lang.IllegalArgumentException: *Magic v1 does not support record
headers*
        at
org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
        at
org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
        at
org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
        at
org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
        at
org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
        at scala.Option.map(Option.scala:146)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
        at scala.Option.flatMap(Option.scala:171)
        at
kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at
kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
        at
kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
        at
kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
        at
kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
        at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
        at
kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
        at
kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
        at
kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
        at
kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
        at
kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
        at
kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
        at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
        at java.lang.Thread.run(Thread.java:745)


I understand this is probably related to a client that uses a client
version that isn't compatible with 0.11, but I don't know how to pinpoint
the client since the topic is used by multiple consumers.
Any idea what this error actually means and how I can find the culprit?
I can't read anything in the logs besides this error  :-S

Thanks a lot!

Re: Magic v1 Does not Support Record Headers

Posted by Shalom Sagges <sh...@gmail.com>.
Luckily, I was able to find that needle in a haystack. :-D
Thanks a lot for your guidance Matthias, it helped me a lot to understand
the issue.

On Mon, Nov 25, 2019 at 9:01 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Fankly, I am not entirely sure...
>
> I would _assume_ that you could still change the message format but I
> would highly recommend to try it out first in a non-production
> environment first.
>
> -Matthias
>
> On 11/25/19 4:51 AM, Shalom Sagges wrote:
> > Thanks a lot Matthias!
> >
> > This problematic topic is actually a topic that's been mirrored from an
> > older 0.8 version (using kafka-mirror).
> > I guess it's not possible to upgrade the message format in this case?
> >
> > Thanks again for your help!
> >
> > On Fri, Nov 22, 2019 at 7:32 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> It's going to be hard to find out which client it is. This is a known
> >> issue in general and there is a KIP that address is:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
> >>
> >> The root cause for the error you see seems to be, that the client tries
> >> to write messages including record headers. Record headers where added
> >> in 0.11.0.0, thus, your brokers basically support them.
> >>
> >> However, it seems that the topic in question is still on message format
> >> 0.10 that does not support record headers. Note that broker version and
> >> message format are independent of each other. You can see from the stack
> >> trace, that the broker tries to down convert the message format (I
> >> assuem from 0.11 to 0.10 -- this down convertion would succeed if record
> >> headers would not be used).
> >>
> >>>
> >>
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
> >>
> >> Thus, the client must either stop using records headers, or you need to
> >> upgrade the message format to 0.11. See the docs for details about
> >> upgrading the message format.
> >>
> >>
> >> Hope that helps.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 11/21/19 12:38 AM, Shalom Sagges wrote:
> >>> Hi Experts,
> >>>
> >>> I use Kafka 0.11.2
> >>>
> >>> I have an issue where the Kafka logs are bombarded with the following
> >> error:
> >>> ERROR [KafkaApi-14733] Error when handling request
> >>>
> >>
> {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]}
> >>> (kafka.server.KafkaApis)
> >>> java.lang.IllegalArgumentException: *Magic v1 does not support record
> >>> headers*
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
> >>>         at scala.Option.map(Option.scala:146)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
> >>>         at scala.Option.flatMap(Option.scala:171)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
> >>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >>>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >>>         at
> >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
> >>>         at
> >>>
> >>
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
> >>>         at
> >>> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
> >>>         at
> >>>
> >>
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
> >>>         at
> >>>
> >>
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
> >>>         at
> >>> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640)
> >>>         at
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
> >>>         at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
> >>>         at
> >>> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
> >>>         at java.lang.Thread.run(Thread.java:745)
> >>>
> >>>
> >>> I understand this is probably related to a client that uses a client
> >>> version that isn't compatible with 0.11, but I don't know how to
> pinpoint
> >>> the client since the topic is used by multiple consumers.
> >>> Any idea what this error actually means and how I can find the culprit?
> >>> I can't read anything in the logs besides this error  :-S
> >>>
> >>> Thanks a lot!
> >>>
> >>
> >>
> >
>
>

Re: Magic v1 Does not Support Record Headers

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Fankly, I am not entirely sure...

I would _assume_ that you could still change the message format but I
would highly recommend to try it out first in a non-production
environment first.

-Matthias

On 11/25/19 4:51 AM, Shalom Sagges wrote:
> Thanks a lot Matthias!
> 
> This problematic topic is actually a topic that's been mirrored from an
> older 0.8 version (using kafka-mirror).
> I guess it's not possible to upgrade the message format in this case?
> 
> Thanks again for your help!
> 
> On Fri, Nov 22, 2019 at 7:32 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> It's going to be hard to find out which client it is. This is a known
>> issue in general and there is a KIP that address is:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
>>
>> The root cause for the error you see seems to be, that the client tries
>> to write messages including record headers. Record headers where added
>> in 0.11.0.0, thus, your brokers basically support them.
>>
>> However, it seems that the topic in question is still on message format
>> 0.10 that does not support record headers. Note that broker version and
>> message format are independent of each other. You can see from the stack
>> trace, that the broker tries to down convert the message format (I
>> assuem from 0.11 to 0.10 -- this down convertion would succeed if record
>> headers would not be used).
>>
>>>
>> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
>>
>> Thus, the client must either stop using records headers, or you need to
>> upgrade the message format to 0.11. See the docs for details about
>> upgrading the message format.
>>
>>
>> Hope that helps.
>>
>>
>> -Matthias
>>
>>
>> On 11/21/19 12:38 AM, Shalom Sagges wrote:
>>> Hi Experts,
>>>
>>> I use Kafka 0.11.2
>>>
>>> I have an issue where the Kafka logs are bombarded with the following
>> error:
>>> ERROR [KafkaApi-14733] Error when handling request
>>>
>> {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]}
>>> (kafka.server.KafkaApis)
>>> java.lang.IllegalArgumentException: *Magic v1 does not support record
>>> headers*
>>>         at
>>>
>> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
>>>         at
>>>
>> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
>>>         at
>>>
>> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
>>>         at
>>>
>> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
>>>         at
>>>
>> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
>>>         at scala.Option.map(Option.scala:146)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
>>>         at scala.Option.flatMap(Option.scala:171)
>>>         at
>>>
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>         at
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>         at
>>>
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
>>>         at
>>>
>> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
>>>         at
>>> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
>>>         at
>>>
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
>>>         at
>>>
>> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
>>>         at
>>>
>> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
>>>         at
>>>
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
>>>         at
>>>
>> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
>>>         at
>>> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640)
>>>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
>>>         at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
>>>         at
>>> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> I understand this is probably related to a client that uses a client
>>> version that isn't compatible with 0.11, but I don't know how to pinpoint
>>> the client since the topic is used by multiple consumers.
>>> Any idea what this error actually means and how I can find the culprit?
>>> I can't read anything in the logs besides this error  :-S
>>>
>>> Thanks a lot!
>>>
>>
>>
> 


Re: Magic v1 Does not Support Record Headers

Posted by Shalom Sagges <sh...@gmail.com>.
Thanks a lot Matthias!

This problematic topic is actually a topic that's been mirrored from an
older 0.8 version (using kafka-mirror).
I guess it's not possible to upgrade the message format in this case?

Thanks again for your help!

On Fri, Nov 22, 2019 at 7:32 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> It's going to be hard to find out which client it is. This is a known
> issue in general and there is a KIP that address is:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
>
> The root cause for the error you see seems to be, that the client tries
> to write messages including record headers. Record headers where added
> in 0.11.0.0, thus, your brokers basically support them.
>
> However, it seems that the topic in question is still on message format
> 0.10 that does not support record headers. Note that broker version and
> message format are independent of each other. You can see from the stack
> trace, that the broker tries to down convert the message format (I
> assuem from 0.11 to 0.10 -- this down convertion would succeed if record
> headers would not be used).
>
> >
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
>
> Thus, the client must either stop using records headers, or you need to
> upgrade the message format to 0.11. See the docs for details about
> upgrading the message format.
>
>
> Hope that helps.
>
>
> -Matthias
>
>
> On 11/21/19 12:38 AM, Shalom Sagges wrote:
> > Hi Experts,
> >
> > I use Kafka 0.11.2
> >
> > I have an issue where the Kafka logs are bombarded with the following
> error:
> > ERROR [KafkaApi-14733] Error when handling request
> >
> {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]}
> > (kafka.server.KafkaApis)
> > java.lang.IllegalArgumentException: *Magic v1 does not support record
> > headers*
> >         at
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
> >         at
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
> >         at
> >
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
> >         at
> >
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
> >         at
> >
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
> >         at scala.Option.map(Option.scala:146)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
> >         at scala.Option.flatMap(Option.scala:171)
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >         at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
> >         at
> >
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
> >         at
> > kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
> >         at
> >
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
> >         at
> >
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
> >         at
> >
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
> >         at
> > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640)
> >         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
> >         at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
> >         at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
> >         at java.lang.Thread.run(Thread.java:745)
> >
> >
> > I understand this is probably related to a client that uses a client
> > version that isn't compatible with 0.11, but I don't know how to pinpoint
> > the client since the topic is used by multiple consumers.
> > Any idea what this error actually means and how I can find the culprit?
> > I can't read anything in the logs besides this error  :-S
> >
> > Thanks a lot!
> >
>
>

Re: Magic v1 Does not Support Record Headers

Posted by "Matthias J. Sax" <ma...@confluent.io>.
It's going to be hard to find out which client it is. This is a known
issue in general and there is a KIP that address is:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers

The root cause for the error you see seems to be, that the client tries
to write messages including record headers. Record headers where added
in 0.11.0.0, thus, your brokers basically support them.

However, it seems that the topic in question is still on message format
0.10 that does not support record headers. Note that broker version and
message format are independent of each other. You can see from the stack
trace, that the broker tries to down convert the message format (I
assuem from 0.11 to 0.10 -- this down convertion would succeed if record
headers would not be used).

> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)

Thus, the client must either stop using records headers, or you need to
upgrade the message format to 0.11. See the docs for details about
upgrading the message format.


Hope that helps.


-Matthias


On 11/21/19 12:38 AM, Shalom Sagges wrote:
> Hi Experts,
> 
> I use Kafka 0.11.2
> 
> I have an issue where the Kafka logs are bombarded with the following error:
> ERROR [KafkaApi-14733] Error when handling request
> {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]}
> (kafka.server.KafkaApis)
> java.lang.IllegalArgumentException: *Magic v1 does not support record
> headers*
>         at
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
>         at
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
>         at
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
>         at
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
>         at
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
>         at scala.Option.map(Option.scala:146)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
>         at scala.Option.flatMap(Option.scala:171)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
>         at
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
>         at
> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
>         at
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
>         at
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
>         at
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
>         at
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
>         at
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
>         at java.lang.Thread.run(Thread.java:745)
> 
> 
> I understand this is probably related to a client that uses a client
> version that isn't compatible with 0.11, but I don't know how to pinpoint
> the client since the topic is used by multiple consumers.
> Any idea what this error actually means and how I can find the culprit?
> I can't read anything in the logs besides this error  :-S
> 
> Thanks a lot!
>