You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Stanislav Kozlovski (JIRA)" <ji...@apache.org> on 2018/10/22 08:46:00 UTC

[jira] [Commented] (KAFKA-7525) Handling corrupt records

    [ https://issues.apache.org/jira/browse/KAFKA-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658747#comment-16658747 ] 

Stanislav Kozlovski commented on KAFKA-7525:
--------------------------------------------

 

 

 

Hi [~Solnica] , thanks for the report!

Regarding 1. - there has been some work that is ongoing which changes what errors are thrown in the case where message corruption has been detected. The issue we currently have is that we don't provide an easy way to seek past the corrupt records itself. Here is the [KIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793]

 

> Handling corrupt records
> ------------------------
>
>                 Key: KAFKA-7525
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7525
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer, core
>    Affects Versions: 1.1.0
>            Reporter: Katarzyna Solnica
>            Priority: Major
>
> When Java consumer encounters a corrupt record on a partition it reads from, it throws:
> {code:java}
> org.apache.kafka.common.KafkaException: Received exception when fetching the next record from XYZ. If needed, please seek past the record to continue consumption.
>     at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1125)
>     at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993)
>     at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527)
>     at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size is less than the minimum record overhead (14){code}
> or:
> {code:java}
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
>     at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:936)
>     at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...){code}
> 1. Could you consider throwing CorruptRecordException from parseCompletedFetch() when error == Errors.CORRUPT_MESSAGE?
> 2. Seeking past the corrupt record means losing data. I've noticed that the record is often correct on a follower ISR, and manual change of the partition leader to the follower node solves the issue in case partition is used by a single consumer group. Couldn't Kafka server discover such situations and recover corrupt records from logs available on other ISRs somehow?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)